risingwave_connector/source/filesystem/opendal_source/
opendal_reader.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::IntoFuture;
16use std::pin::Pin;
17
18use async_compression::tokio::bufread::GzipDecoder;
19use async_trait::async_trait;
20use futures::TryStreamExt;
21use futures_async_stream::try_stream;
22use opendal::Operator;
23use prometheus::core::GenericCounter;
24use risingwave_common::array::StreamChunk;
25use risingwave_common::metrics::LabelGuardedMetric;
26use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader};
27use tokio_util::io::StreamReader;
28
29use super::OpendalSource;
30use super::opendal_enumerator::OpendalEnumerator;
31use crate::error::ConnectorResult;
32use crate::parser::{ByteStreamSourceParserImpl, EncodingProperties, ParserConfig};
33use crate::source::filesystem::OpendalFsSplit;
34use crate::source::filesystem::file_common::CompressionFormat;
35use crate::source::filesystem::nd_streaming::need_nd_streaming;
36use crate::source::iceberg::read_parquet_file;
37use crate::source::{
38    BoxSourceChunkStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData,
39    SplitReader,
40};
41
42#[derive(Debug, Clone)]
43pub struct OpendalReader<Src: OpendalSource> {
44    connector: OpendalEnumerator<Src>,
45    splits: Vec<OpendalFsSplit<Src>>,
46    parser_config: ParserConfig,
47    source_ctx: SourceContextRef,
48    columns: Option<Vec<Column>>,
49}
50
51#[async_trait]
52impl<Src: OpendalSource> SplitReader for OpendalReader<Src> {
53    type Properties = Src::Properties;
54    type Split = OpendalFsSplit<Src>;
55
56    async fn new(
57        properties: Src::Properties,
58        splits: Vec<OpendalFsSplit<Src>>,
59        parser_config: ParserConfig,
60        source_ctx: SourceContextRef,
61        columns: Option<Vec<Column>>,
62    ) -> ConnectorResult<Self> {
63        let connector = Src::new_enumerator(properties)?;
64        let opendal_reader = OpendalReader {
65            connector,
66            splits,
67            parser_config,
68            source_ctx,
69            columns,
70        };
71        Ok(opendal_reader)
72    }
73
74    fn into_stream(self) -> BoxSourceChunkStream {
75        self.into_stream_inner()
76    }
77}
78
79impl<Src: OpendalSource> OpendalReader<Src> {
80    #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
81    async fn into_stream_inner(self) {
82        for split in self.splits {
83            let source_ctx = self.source_ctx.clone();
84
85            let object_name = split.name.clone();
86            let actor_id = source_ctx.actor_id.to_string();
87            let fragment_id = source_ctx.fragment_id.to_string();
88            let source_id = source_ctx.source_id.to_string();
89            let source_name = source_ctx.source_name.clone();
90            let file_source_input_row_count = self
91                .source_ctx
92                .metrics
93                .file_source_input_row_count
94                .with_guarded_label_values(&[&source_id, &source_name, &actor_id, &fragment_id]);
95            let chunk_stream;
96            if let EncodingProperties::Parquet(parquet_props) =
97                &self.parser_config.specific.encoding_config
98            {
99                let actor_id = source_ctx.actor_id.to_string();
100                let source_id = source_ctx.source_id.to_string();
101                let split_id = split.id();
102                let source_name = source_ctx.source_name.clone();
103                let parquet_source_skip_row_count_metrics: LabelGuardedMetric<
104                    GenericCounter<prometheus::core::AtomicU64>,
105                > = self
106                    .source_ctx
107                    .metrics
108                    .parquet_source_skip_row_count
109                    .with_guarded_label_values(&[
110                        actor_id.as_str(),
111                        source_id.as_str(),
112                        &split_id,
113                        source_name.as_str(),
114                    ]);
115                chunk_stream = read_parquet_file(
116                    self.connector.op.clone(),
117                    object_name.clone(),
118                    self.columns.clone(),
119                    Some(self.parser_config.common.rw_columns.clone()),
120                    parquet_props.case_insensitive,
121                    self.source_ctx.source_ctrl_opts.chunk_size,
122                    split.offset,
123                    Some(file_source_input_row_count.clone()),
124                    Some(parquet_source_skip_row_count_metrics),
125                )
126                .await?;
127            } else {
128                assert!(
129                    need_nd_streaming(&self.parser_config.specific.encoding_config),
130                    "except for parquet, file source only support split by newline for now"
131                );
132
133                let line_stream = Self::stream_read_lines(
134                    self.connector.op.clone(),
135                    split,
136                    self.source_ctx.clone(),
137                    self.connector.compression_format.clone(),
138                    file_source_input_row_count.clone(),
139                );
140
141                let parser =
142                    ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx)
143                        .await?;
144                chunk_stream = Box::pin(parser.parse_stream(line_stream));
145            }
146
147            #[for_await]
148            for chunk in chunk_stream {
149                yield chunk?;
150            }
151        }
152    }
153
154    #[try_stream(boxed, ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
155    pub async fn stream_read_lines(
156        op: Operator,
157        split: OpendalFsSplit<Src>,
158        source_ctx: SourceContextRef,
159        compression_format: CompressionFormat,
160        file_source_input_row_count_metrics: LabelGuardedMetric<
161            GenericCounter<prometheus::core::AtomicU64>,
162        >,
163    ) {
164        let actor_id = source_ctx.actor_id.to_string();
165        let fragment_id = source_ctx.fragment_id.to_string();
166        let source_id = source_ctx.source_id.to_string();
167        let source_name = source_ctx.source_name.clone();
168        let split_id = split.id();
169        let object_name = split.name.clone();
170        let start_offset = split.offset;
171        // After a recovery occurs, for gzip-compressed files, it is necessary to read from the beginning each time,
172        // other files can continue reading from the last read `start_offset`.
173        let reader = match object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
174            true => op.read_with(&object_name).into_future().await?,
175
176            false => {
177                op.read_with(&object_name)
178                    .range(start_offset as u64..)
179                    .into_future()
180                    .await?
181            }
182        };
183
184        let stream_reader = StreamReader::new(reader.map_err(std::io::Error::other));
185
186        let mut buf_reader: Pin<Box<dyn AsyncBufRead + Send>> = match compression_format {
187            CompressionFormat::Gzip => {
188                let gzip_decoder = GzipDecoder::new(stream_reader);
189                Box::pin(BufReader::new(gzip_decoder)) as Pin<Box<dyn AsyncBufRead + Send>>
190            }
191            CompressionFormat::None => {
192                // todo: support automatic decompression of more compression types.
193                if object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
194                    let gzip_decoder = GzipDecoder::new(stream_reader);
195                    Box::pin(BufReader::new(gzip_decoder)) as Pin<Box<dyn AsyncBufRead + Send>>
196                } else {
197                    Box::pin(BufReader::new(stream_reader)) as Pin<Box<dyn AsyncBufRead + Send>>
198                }
199            }
200        };
201
202        let mut offset = match object_name.ends_with(".gz") || object_name.ends_with(".gzip") {
203            true => 0,
204            false => start_offset,
205        };
206        let partition_input_bytes_metrics = source_ctx
207            .metrics
208            .partition_input_bytes
209            .with_guarded_label_values(&[
210                actor_id.as_str(),
211                source_id.as_str(),
212                &split_id,
213                source_name.as_str(),
214                fragment_id.as_str(),
215            ]);
216
217        let max_chunk_size = source_ctx.source_ctrl_opts.chunk_size;
218        let mut batch = Vec::with_capacity(max_chunk_size);
219        let mut line_buf = String::new();
220
221        loop {
222            let n_read = buf_reader.read_line(&mut line_buf).await?;
223            if n_read == 0 {
224                // EOF
225                break;
226            }
227            let msg_offset = (offset + n_read).to_string();
228            // note that the buffer contains the newline character
229            debug_assert_eq!(n_read, line_buf.len());
230            if (object_name.ends_with(".gz") || object_name.ends_with(".gzip"))
231                && offset + n_read <= start_offset
232            {
233                // For gzip compressed files, the reader needs to read from the beginning each time,
234                // but it needs to skip the previously read part and start yielding chunks from a position greater than or equal to start_offset.
235            } else {
236                batch.push(SourceMessage {
237                    key: None,
238                    payload: Some(std::mem::take(&mut line_buf).into_bytes()),
239                    offset: msg_offset,
240                    split_id: split.id(),
241                    meta: SourceMeta::Empty,
242                });
243            }
244
245            offset += n_read;
246            partition_input_bytes_metrics.inc_by(n_read as _);
247
248            if batch.len() >= max_chunk_size {
249                file_source_input_row_count_metrics.inc_by(max_chunk_size as _);
250                yield std::mem::replace(&mut batch, Vec::with_capacity(max_chunk_size));
251            }
252        }
253
254        if !batch.is_empty() {
255            batch.shrink_to_fit();
256            file_source_input_row_count_metrics.inc_by(batch.len() as _);
257            yield batch;
258        }
259    }
260}