risingwave_connector/source/iceberg/
parquet_file_handler.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::future::IntoFuture;
17use std::ops::Range;
18use std::pin::Pin;
19use std::sync::Arc;
20
21use anyhow::anyhow;
22use bytes::Bytes;
23use futures::future::BoxFuture;
24use futures::{FutureExt, Stream, TryFutureExt};
25use iceberg::io::{
26    FileIOBuilder, FileMetadata, FileRead, S3_ACCESS_KEY_ID, S3_REGION, S3_SECRET_ACCESS_KEY,
27};
28use iceberg::{Error, ErrorKind};
29use itertools::Itertools;
30use opendal::Operator;
31use opendal::layers::{LoggingLayer, RetryLayer};
32use opendal::services::{Azblob, Gcs, S3};
33use parquet::arrow::async_reader::AsyncFileReader;
34use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, parquet_to_arrow_schema};
35use parquet::file::metadata::{FileMetaData, ParquetMetaData, ParquetMetaDataReader};
36use risingwave_common::array::StreamChunk;
37use risingwave_common::array::arrow::{IcebergArrowConvert, is_parquet_schema_match_source_schema};
38use risingwave_common::catalog::{ColumnDesc, ColumnId};
39use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt;
40use url::Url;
41
42use crate::error::ConnectorResult;
43use crate::parser::ParquetParser;
44use crate::source::{Column, SourceColumnDesc};
45
46pub struct ParquetFileReader<R: FileRead> {
47    meta: FileMetadata,
48    r: R,
49}
50
51impl<R: FileRead> ParquetFileReader<R> {
52    pub fn new(meta: FileMetadata, r: R) -> Self {
53        Self { meta, r }
54    }
55}
56
57impl<R: FileRead> AsyncFileReader for ParquetFileReader<R> {
58    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
59        Box::pin(
60            self.r
61                .read(range.start as _..range.end as _)
62                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
63        )
64    }
65
66    fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
67        async move {
68            let reader = ParquetMetaDataReader::new();
69            let size = self.meta.size as usize;
70            let meta = reader.load_and_finish(self, size).await?;
71
72            Ok(Arc::new(meta))
73        }
74        .boxed()
75    }
76}
77
78pub async fn create_parquet_stream_builder(
79    s3_region: String,
80    s3_access_key: String,
81    s3_secret_key: String,
82    location: String,
83) -> Result<ParquetRecordBatchStreamBuilder<ParquetFileReader<impl FileRead>>, anyhow::Error> {
84    let mut props = HashMap::new();
85    props.insert(S3_REGION, s3_region.clone());
86    props.insert(S3_ACCESS_KEY_ID, s3_access_key.clone());
87    props.insert(S3_SECRET_ACCESS_KEY, s3_secret_key.clone());
88
89    let file_io_builder = FileIOBuilder::new("s3");
90    let file_io = file_io_builder
91        .with_props(props.into_iter())
92        .build()
93        .map_err(|e| anyhow!(e))?;
94    let parquet_file = file_io.new_input(&location).map_err(|e| anyhow!(e))?;
95
96    let parquet_metadata = parquet_file.metadata().await.map_err(|e| anyhow!(e))?;
97    let parquet_reader = parquet_file.reader().await.map_err(|e| anyhow!(e))?;
98    let parquet_file_reader = ParquetFileReader::new(parquet_metadata, parquet_reader);
99
100    ParquetRecordBatchStreamBuilder::new(parquet_file_reader)
101        .await
102        .map_err(|e| anyhow!(e))
103}
104
105pub fn new_s3_operator(
106    s3_region: String,
107    s3_access_key: String,
108    s3_secret_key: String,
109    bucket: String,
110    s3_endpoint: String,
111) -> ConnectorResult<Operator> {
112    let mut builder = S3::default();
113    builder = builder
114        .region(&s3_region)
115        .endpoint(&s3_endpoint)
116        .access_key_id(&s3_access_key)
117        .secret_access_key(&s3_secret_key)
118        .bucket(&bucket)
119        .disable_config_load();
120    let op: Operator = Operator::new(builder)?
121        .layer(LoggingLayer::default())
122        .layer(RetryLayer::default())
123        .finish();
124
125    Ok(op)
126}
127
128pub fn new_gcs_operator(credential: String, bucket: String) -> ConnectorResult<Operator> {
129    // Create gcs builder.
130    let builder = Gcs::default().bucket(&bucket).credential(&credential);
131
132    let operator: Operator = Operator::new(builder)?
133        .layer(LoggingLayer::default())
134        .layer(RetryLayer::default())
135        .finish();
136    Ok(operator)
137}
138
139pub fn new_azblob_operator(
140    endpoint: String,
141    account_name: String,
142    account_key: String,
143    container_name: String,
144) -> ConnectorResult<Operator> {
145    // Create azblob builder.
146    let mut builder = Azblob::default();
147    builder = builder
148        .container(&container_name)
149        .endpoint(&endpoint)
150        .account_name(&account_name)
151        .account_key(&account_key);
152
153    let operator: Operator = Operator::new(builder)?
154        .layer(LoggingLayer::default())
155        .layer(RetryLayer::default())
156        .finish();
157    Ok(operator)
158}
159
160#[derive(Debug, Clone)]
161pub enum FileScanBackend {
162    S3,
163    Gcs,
164    Azblob,
165}
166
167pub fn extract_bucket_and_file_name(
168    location: &str,
169    file_scan_backend: &FileScanBackend,
170) -> ConnectorResult<(String, String)> {
171    let url = Url::parse(location)?;
172    let bucket = url
173        .host_str()
174        .ok_or_else(|| {
175            Error::new(
176                ErrorKind::DataInvalid,
177                format!("Invalid url: {}, missing bucket", location),
178            )
179        })?
180        .to_owned();
181    let prefix = match file_scan_backend {
182        FileScanBackend::S3 => format!("s3://{}/", bucket),
183        FileScanBackend::Gcs => format!("gcs://{}/", bucket),
184        FileScanBackend::Azblob => format!("azblob://{}/", bucket),
185    };
186    let file_name = location[prefix.len()..].to_string();
187    Ok((bucket, file_name))
188}
189
190pub async fn list_data_directory(
191    op: Operator,
192    dir: String,
193    file_scan_backend: &FileScanBackend,
194) -> Result<Vec<String>, anyhow::Error> {
195    let (bucket, file_name) = extract_bucket_and_file_name(&dir, file_scan_backend)?;
196    let prefix = match file_scan_backend {
197        FileScanBackend::S3 => format!("s3://{}/", bucket),
198        FileScanBackend::Gcs => format!("gcs://{}/", bucket),
199        FileScanBackend::Azblob => format!("azblob://{}/", bucket),
200    };
201    if dir.starts_with(&prefix) {
202        op.list(&file_name)
203            .await
204            .map_err(|e| anyhow!(e))
205            .map(|list| {
206                list.into_iter()
207                    .map(|entry| prefix.clone() + entry.path())
208                    .collect()
209            })
210    } else {
211        Err(Error::new(
212            ErrorKind::DataInvalid,
213            format!("Invalid url: {}, should start with {}", dir, prefix),
214        ))?
215    }
216}
217
218/// Extracts a suitable `ProjectionMask` from a Parquet file schema based on the user's requested schema.
219///
220/// This function is utilized for column pruning of Parquet files. It checks the user's requested schema
221/// against the schema of the currently read Parquet file. If the provided `columns` are `None`
222/// or if the Parquet file contains nested data types, it returns `ProjectionMask::all()`. Otherwise,
223/// it returns only the columns where both the data type and column name match the requested schema,
224/// facilitating efficient reading of the `RecordBatch`.
225///
226/// # Parameters
227/// - `columns`: An optional vector of `Column` representing the user's requested schema.
228/// - `metadata`: A reference to `FileMetaData` containing the schema and metadata of the Parquet file.
229///
230/// # Returns
231/// - A `ConnectorResult<ProjectionMask>`, which represents the valid columns in the Parquet file schema
232///   that correspond to the requested schema. If an error occurs during processing, it returns an
233///   appropriate error.
234pub fn get_project_mask(
235    columns: Option<Vec<Column>>,
236    metadata: &FileMetaData,
237) -> ConnectorResult<ProjectionMask> {
238    match columns {
239        Some(rw_columns) => {
240            let root_column_names = metadata
241                .schema_descr()
242                .root_schema()
243                .get_fields()
244                .iter()
245                .map(|field| field.name())
246                .collect_vec();
247
248            let converted_arrow_schema =
249                parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata())
250                    .map_err(anyhow::Error::from)?;
251            let valid_column_indices: Vec<usize> = rw_columns
252                .iter()
253                .filter_map(|column| {
254                    root_column_names
255                        .iter()
256                        .position(|&name| name == column.name)
257                        .and_then(|pos| {
258                            let arrow_data_type: &risingwave_common::array::arrow::arrow_schema_iceberg::DataType = converted_arrow_schema.field_with_name(&column.name).ok()?.data_type();
259                            let rw_data_type: &risingwave_common::types::DataType = &column.data_type;
260                            if is_parquet_schema_match_source_schema(arrow_data_type, rw_data_type) {
261                                Some(pos)
262                            } else {
263                                None
264                            }
265                        })
266                })
267                .collect();
268
269            Ok(ProjectionMask::roots(
270                metadata.schema_descr(),
271                valid_column_indices,
272            ))
273        }
274        None => Ok(ProjectionMask::all()),
275    }
276}
277
278/// Reads a specified Parquet file and converts its content into a stream of chunks.
279pub async fn read_parquet_file(
280    op: Operator,
281    file_name: String,
282    rw_columns: Option<Vec<Column>>,
283    parser_columns: Option<Vec<SourceColumnDesc>>,
284    batch_size: usize,
285    offset: usize,
286    file_source_input_row_count_metrics: Option<
287        risingwave_common::metrics::LabelGuardedMetric<
288            prometheus::core::GenericCounter<prometheus::core::AtomicU64>,
289            4,
290        >,
291    >,
292    parquet_source_skip_row_count_metrics: Option<
293        risingwave_common::metrics::LabelGuardedMetric<
294            prometheus::core::GenericCounter<prometheus::core::AtomicU64>,
295            4,
296        >,
297    >,
298) -> ConnectorResult<
299    Pin<Box<dyn Stream<Item = Result<StreamChunk, crate::error::ConnectorError>> + Send>>,
300> {
301    let mut reader: tokio_util::compat::Compat<opendal::FuturesAsyncReader> = op
302        .reader_with(&file_name)
303        .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`.
304        .await?
305        .into_futures_async_read(..)
306        .await?
307        .compat();
308    let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?;
309
310    let file_metadata = parquet_metadata.file_metadata();
311    let projection_mask = get_project_mask(rw_columns, file_metadata)?;
312
313    // For the Parquet format, we directly convert from a record batch to a stream chunk.
314    // Therefore, the offset of the Parquet file represents the current position in terms of the number of rows read from the file.
315    let record_batch_stream = ParquetRecordBatchStreamBuilder::new(reader)
316        .await?
317        .with_batch_size(batch_size)
318        .with_projection(projection_mask)
319        .with_offset(offset)
320        .build()?;
321    let converted_arrow_schema = parquet_to_arrow_schema(
322        file_metadata.schema_descr(),
323        file_metadata.key_value_metadata(),
324    )
325    .map_err(anyhow::Error::from)?;
326    let columns = match parser_columns {
327        Some(columns) => columns,
328        None => converted_arrow_schema
329            .fields
330            .iter()
331            .enumerate()
332            .map(|(index, field_ref)| {
333                let data_type = IcebergArrowConvert.type_from_field(field_ref).unwrap();
334                let column_desc = ColumnDesc::named(
335                    field_ref.name().clone(),
336                    ColumnId::new(index as i32),
337                    data_type,
338                );
339                SourceColumnDesc::from(&column_desc)
340            })
341            .collect(),
342    };
343    let parquet_parser = ParquetParser::new(columns, file_name, offset)?;
344    let msg_stream: Pin<
345        Box<dyn Stream<Item = Result<StreamChunk, crate::error::ConnectorError>> + Send>,
346    > = parquet_parser.into_stream(
347        record_batch_stream,
348        file_source_input_row_count_metrics,
349        parquet_source_skip_row_count_metrics,
350    );
351    Ok(msg_stream)
352}
353
354pub async fn get_parquet_fields(
355    op: Operator,
356    file_name: String,
357) -> ConnectorResult<risingwave_common::array::arrow::arrow_schema_iceberg::Fields> {
358    let mut reader: tokio_util::compat::Compat<opendal::FuturesAsyncReader> = op
359        .reader_with(&file_name)
360        .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`.
361        .await?
362        .into_futures_async_read(..)
363        .await?
364        .compat();
365    let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?;
366
367    let file_metadata = parquet_metadata.file_metadata();
368    let converted_arrow_schema = parquet_to_arrow_schema(
369        file_metadata.schema_descr(),
370        file_metadata.key_value_metadata(),
371    )
372    .map_err(anyhow::Error::from)?;
373    let fields: risingwave_common::array::arrow::arrow_schema_iceberg::Fields =
374        converted_arrow_schema.fields;
375    Ok(fields)
376}