risingwave_connector/source/iceberg/
parquet_file_handler.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::future::IntoFuture;
17use std::ops::Range;
18use std::pin::Pin;
19use std::sync::Arc;
20
21use anyhow::anyhow;
22use bytes::Bytes;
23use futures::future::BoxFuture;
24use futures::{FutureExt, Stream, TryFutureExt};
25use iceberg::io::{
26    FileIOBuilder, FileMetadata, FileRead, S3_ACCESS_KEY_ID, S3_REGION, S3_SECRET_ACCESS_KEY,
27};
28use iceberg::{Error, ErrorKind};
29use itertools::Itertools;
30use opendal::Operator;
31use opendal::layers::{LoggingLayer, RetryLayer};
32use opendal::services::{Azblob, Gcs, S3};
33use parquet::arrow::async_reader::AsyncFileReader;
34use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, parquet_to_arrow_schema};
35use parquet::file::metadata::{FileMetaData, ParquetMetaData, ParquetMetaDataReader};
36use prometheus::core::GenericCounter;
37use risingwave_common::array::StreamChunk;
38use risingwave_common::array::arrow::{IcebergArrowConvert, is_parquet_schema_match_source_schema};
39use risingwave_common::catalog::{ColumnDesc, ColumnId};
40use risingwave_common::metrics::LabelGuardedMetric;
41use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt;
42use url::Url;
43
44use crate::error::ConnectorResult;
45use crate::parser::ParquetParser;
46use crate::source::{Column, SourceColumnDesc};
47
48pub struct ParquetFileReader<R: FileRead> {
49    meta: FileMetadata,
50    r: R,
51}
52
53impl<R: FileRead> ParquetFileReader<R> {
54    pub fn new(meta: FileMetadata, r: R) -> Self {
55        Self { meta, r }
56    }
57}
58
59impl<R: FileRead> AsyncFileReader for ParquetFileReader<R> {
60    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
61        Box::pin(
62            self.r
63                .read(range.start as _..range.end as _)
64                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
65        )
66    }
67
68    fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
69        async move {
70            let reader = ParquetMetaDataReader::new();
71            let size = self.meta.size as usize;
72            let meta = reader.load_and_finish(self, size).await?;
73
74            Ok(Arc::new(meta))
75        }
76        .boxed()
77    }
78}
79
80pub async fn create_parquet_stream_builder(
81    s3_region: String,
82    s3_access_key: String,
83    s3_secret_key: String,
84    location: String,
85) -> Result<ParquetRecordBatchStreamBuilder<ParquetFileReader<impl FileRead>>, anyhow::Error> {
86    let mut props = HashMap::new();
87    props.insert(S3_REGION, s3_region.clone());
88    props.insert(S3_ACCESS_KEY_ID, s3_access_key.clone());
89    props.insert(S3_SECRET_ACCESS_KEY, s3_secret_key.clone());
90
91    let file_io_builder = FileIOBuilder::new("s3");
92    let file_io = file_io_builder
93        .with_props(props.into_iter())
94        .build()
95        .map_err(|e| anyhow!(e))?;
96    let parquet_file = file_io.new_input(&location).map_err(|e| anyhow!(e))?;
97
98    let parquet_metadata = parquet_file.metadata().await.map_err(|e| anyhow!(e))?;
99    let parquet_reader = parquet_file.reader().await.map_err(|e| anyhow!(e))?;
100    let parquet_file_reader = ParquetFileReader::new(parquet_metadata, parquet_reader);
101
102    ParquetRecordBatchStreamBuilder::new(parquet_file_reader)
103        .await
104        .map_err(|e| anyhow!(e))
105}
106
107pub fn new_s3_operator(
108    s3_region: String,
109    s3_access_key: String,
110    s3_secret_key: String,
111    bucket: String,
112    s3_endpoint: String,
113) -> ConnectorResult<Operator> {
114    let mut builder = S3::default();
115    builder = builder
116        .region(&s3_region)
117        .endpoint(&s3_endpoint)
118        .access_key_id(&s3_access_key)
119        .secret_access_key(&s3_secret_key)
120        .bucket(&bucket)
121        .disable_config_load();
122    let op: Operator = Operator::new(builder)?
123        .layer(LoggingLayer::default())
124        .layer(RetryLayer::default())
125        .finish();
126
127    Ok(op)
128}
129
130pub fn new_gcs_operator(credential: String, bucket: String) -> ConnectorResult<Operator> {
131    // Create gcs builder.
132    let builder = Gcs::default().bucket(&bucket).credential(&credential);
133
134    let operator: Operator = Operator::new(builder)?
135        .layer(LoggingLayer::default())
136        .layer(RetryLayer::default())
137        .finish();
138    Ok(operator)
139}
140
141pub fn new_azblob_operator(
142    endpoint: String,
143    account_name: String,
144    account_key: String,
145    container_name: String,
146) -> ConnectorResult<Operator> {
147    // Create azblob builder.
148    let mut builder = Azblob::default();
149    builder = builder
150        .container(&container_name)
151        .endpoint(&endpoint)
152        .account_name(&account_name)
153        .account_key(&account_key);
154
155    let operator: Operator = Operator::new(builder)?
156        .layer(LoggingLayer::default())
157        .layer(RetryLayer::default())
158        .finish();
159    Ok(operator)
160}
161
162#[derive(Debug, Clone)]
163pub enum FileScanBackend {
164    S3,
165    Gcs,
166    Azblob,
167}
168
169pub fn extract_bucket_and_file_name(
170    location: &str,
171    file_scan_backend: &FileScanBackend,
172) -> ConnectorResult<(String, String)> {
173    let url = Url::parse(location)?;
174    let bucket = url
175        .host_str()
176        .ok_or_else(|| {
177            Error::new(
178                ErrorKind::DataInvalid,
179                format!("Invalid url: {}, missing bucket", location),
180            )
181        })?
182        .to_owned();
183    let prefix = match file_scan_backend {
184        FileScanBackend::S3 => format!("s3://{}/", bucket),
185        FileScanBackend::Gcs => format!("gcs://{}/", bucket),
186        FileScanBackend::Azblob => format!("azblob://{}/", bucket),
187    };
188    let file_name = location[prefix.len()..].to_string();
189    Ok((bucket, file_name))
190}
191
192pub async fn list_data_directory(
193    op: Operator,
194    dir: String,
195    file_scan_backend: &FileScanBackend,
196) -> Result<Vec<String>, anyhow::Error> {
197    let (bucket, file_name) = extract_bucket_and_file_name(&dir, file_scan_backend)?;
198    let prefix = match file_scan_backend {
199        FileScanBackend::S3 => format!("s3://{}/", bucket),
200        FileScanBackend::Gcs => format!("gcs://{}/", bucket),
201        FileScanBackend::Azblob => format!("azblob://{}/", bucket),
202    };
203    if dir.starts_with(&prefix) {
204        op.list(&file_name)
205            .await
206            .map_err(|e| anyhow!(e))
207            .map(|list| {
208                list.into_iter()
209                    .map(|entry| prefix.clone() + entry.path())
210                    .collect()
211            })
212    } else {
213        Err(Error::new(
214            ErrorKind::DataInvalid,
215            format!("Invalid url: {}, should start with {}", dir, prefix),
216        ))?
217    }
218}
219
220/// Extracts a suitable `ProjectionMask` from a Parquet file schema based on the user's requested schema.
221///
222/// This function is utilized for column pruning of Parquet files. It checks the user's requested schema
223/// against the schema of the currently read Parquet file. If the provided `columns` are `None`
224/// or if the Parquet file contains nested data types, it returns `ProjectionMask::all()`. Otherwise,
225/// it returns only the columns where both the data type and column name match the requested schema,
226/// facilitating efficient reading of the `RecordBatch`.
227///
228/// # Parameters
229/// - `columns`: An optional vector of `Column` representing the user's requested schema.
230/// - `metadata`: A reference to `FileMetaData` containing the schema and metadata of the Parquet file.
231///
232/// # Returns
233/// - A `ConnectorResult<ProjectionMask>`, which represents the valid columns in the Parquet file schema
234///   that correspond to the requested schema. If an error occurs during processing, it returns an
235///   appropriate error.
236pub fn get_project_mask(
237    columns: Option<Vec<Column>>,
238    metadata: &FileMetaData,
239) -> ConnectorResult<ProjectionMask> {
240    match columns {
241        Some(rw_columns) => {
242            let root_column_names = metadata
243                .schema_descr()
244                .root_schema()
245                .get_fields()
246                .iter()
247                .map(|field| field.name())
248                .collect_vec();
249
250            let converted_arrow_schema =
251                parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata())
252                    .map_err(anyhow::Error::from)?;
253            let valid_column_indices: Vec<usize> = rw_columns
254                .iter()
255                .filter_map(|column| {
256                    root_column_names
257                        .iter()
258                        .position(|&name| name == column.name)
259                        .and_then(|pos| {
260                            let arrow_data_type: &risingwave_common::array::arrow::arrow_schema_iceberg::DataType = converted_arrow_schema.field_with_name(&column.name).ok()?.data_type();
261                            let rw_data_type: &risingwave_common::types::DataType = &column.data_type;
262                            if is_parquet_schema_match_source_schema(arrow_data_type, rw_data_type) {
263                                Some(pos)
264                            } else {
265                                None
266                            }
267                        })
268                })
269                .collect();
270
271            Ok(ProjectionMask::roots(
272                metadata.schema_descr(),
273                valid_column_indices,
274            ))
275        }
276        None => Ok(ProjectionMask::all()),
277    }
278}
279
280/// Reads a specified Parquet file and converts its content into a stream of chunks.
281pub async fn read_parquet_file(
282    op: Operator,
283    file_name: String,
284    rw_columns: Option<Vec<Column>>,
285    parser_columns: Option<Vec<SourceColumnDesc>>,
286    batch_size: usize,
287    offset: usize,
288    file_source_input_row_count_metrics: Option<
289        LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
290    >,
291    parquet_source_skip_row_count_metrics: Option<
292        LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
293    >,
294) -> ConnectorResult<
295    Pin<Box<dyn Stream<Item = Result<StreamChunk, crate::error::ConnectorError>> + Send>>,
296> {
297    let mut reader: tokio_util::compat::Compat<opendal::FuturesAsyncReader> = op
298        .reader_with(&file_name)
299        .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`.
300        .await?
301        .into_futures_async_read(..)
302        .await?
303        .compat();
304    let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?;
305
306    let file_metadata = parquet_metadata.file_metadata();
307    let projection_mask = get_project_mask(rw_columns, file_metadata)?;
308
309    // For the Parquet format, we directly convert from a record batch to a stream chunk.
310    // Therefore, the offset of the Parquet file represents the current position in terms of the number of rows read from the file.
311    let record_batch_stream = ParquetRecordBatchStreamBuilder::new(reader)
312        .await?
313        .with_batch_size(batch_size)
314        .with_projection(projection_mask)
315        .with_offset(offset)
316        .build()?;
317    let converted_arrow_schema = parquet_to_arrow_schema(
318        file_metadata.schema_descr(),
319        file_metadata.key_value_metadata(),
320    )
321    .map_err(anyhow::Error::from)?;
322    let columns = match parser_columns {
323        Some(columns) => columns,
324        None => converted_arrow_schema
325            .fields
326            .iter()
327            .enumerate()
328            .map(|(index, field_ref)| {
329                let data_type = IcebergArrowConvert.type_from_field(field_ref).unwrap();
330                let column_desc = ColumnDesc::named(
331                    field_ref.name().clone(),
332                    ColumnId::new(index as i32),
333                    data_type,
334                );
335                SourceColumnDesc::from(&column_desc)
336            })
337            .collect(),
338    };
339    let parquet_parser = ParquetParser::new(columns, file_name, offset)?;
340    let msg_stream: Pin<
341        Box<dyn Stream<Item = Result<StreamChunk, crate::error::ConnectorError>> + Send>,
342    > = parquet_parser.into_stream(
343        record_batch_stream,
344        file_source_input_row_count_metrics,
345        parquet_source_skip_row_count_metrics,
346    );
347    Ok(msg_stream)
348}
349
350pub async fn get_parquet_fields(
351    op: Operator,
352    file_name: String,
353) -> ConnectorResult<risingwave_common::array::arrow::arrow_schema_iceberg::Fields> {
354    let mut reader: tokio_util::compat::Compat<opendal::FuturesAsyncReader> = op
355        .reader_with(&file_name)
356        .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`.
357        .await?
358        .into_futures_async_read(..)
359        .await?
360        .compat();
361    let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?;
362
363    let file_metadata = parquet_metadata.file_metadata();
364    let converted_arrow_schema = parquet_to_arrow_schema(
365        file_metadata.schema_descr(),
366        file_metadata.key_value_metadata(),
367    )
368    .map_err(anyhow::Error::from)?;
369    let fields: risingwave_common::array::arrow::arrow_schema_iceberg::Fields =
370        converted_arrow_schema.fields;
371    Ok(fields)
372}