risingwave_connector/source/iceberg/
parquet_file_handler.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::future::IntoFuture;
17use std::ops::Range;
18use std::pin::Pin;
19use std::sync::Arc;
20
21use anyhow::{Context, bail};
22use bytes::Bytes;
23use futures::future::BoxFuture;
24use futures::{FutureExt, Stream, TryFutureExt};
25use iceberg::io::{
26    FileIOBuilder, FileMetadata, FileRead, S3_ACCESS_KEY_ID, S3_REGION, S3_SECRET_ACCESS_KEY,
27};
28use itertools::Itertools;
29use opendal::Operator;
30use opendal::layers::{LoggingLayer, RetryLayer};
31use opendal::services::{Azblob, Gcs, S3};
32use parquet::arrow::async_reader::AsyncFileReader;
33use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, parquet_to_arrow_schema};
34use parquet::file::metadata::{FileMetaData, ParquetMetaData, ParquetMetaDataReader};
35use prometheus::core::GenericCounter;
36use risingwave_common::array::StreamChunk;
37use risingwave_common::array::arrow::{IcebergArrowConvert, is_parquet_schema_match_source_schema};
38use risingwave_common::catalog::{ColumnDesc, ColumnId};
39use risingwave_common::metrics::LabelGuardedMetric;
40use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt;
41use url::Url;
42
43use crate::error::ConnectorResult;
44use crate::parser::ParquetParser;
45use crate::source::{Column, SourceColumnDesc};
46
47pub struct ParquetFileReader<R: FileRead> {
48    meta: FileMetadata,
49    r: R,
50}
51
52impl<R: FileRead> ParquetFileReader<R> {
53    pub fn new(meta: FileMetadata, r: R) -> Self {
54        Self { meta, r }
55    }
56}
57
58impl<R: FileRead> AsyncFileReader for ParquetFileReader<R> {
59    fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
60        Box::pin(
61            self.r
62                .read(range.start as _..range.end as _)
63                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
64        )
65    }
66
67    fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
68        async move {
69            let reader = ParquetMetaDataReader::new();
70            let size = self.meta.size as usize;
71            let meta = reader.load_and_finish(self, size).await?;
72
73            Ok(Arc::new(meta))
74        }
75        .boxed()
76    }
77}
78
79pub async fn create_parquet_stream_builder(
80    s3_region: String,
81    s3_access_key: String,
82    s3_secret_key: String,
83    location: String,
84) -> Result<ParquetRecordBatchStreamBuilder<ParquetFileReader<impl FileRead>>, anyhow::Error> {
85    let mut props = HashMap::new();
86    props.insert(S3_REGION, s3_region.clone());
87    props.insert(S3_ACCESS_KEY_ID, s3_access_key.clone());
88    props.insert(S3_SECRET_ACCESS_KEY, s3_secret_key.clone());
89
90    let file_io_builder = FileIOBuilder::new("s3");
91    let file_io = file_io_builder.with_props(props.into_iter()).build()?;
92    let parquet_file = file_io.new_input(&location)?;
93
94    let parquet_metadata = parquet_file.metadata().await?;
95    let parquet_reader = parquet_file.reader().await?;
96    let parquet_file_reader = ParquetFileReader::new(parquet_metadata, parquet_reader);
97
98    ParquetRecordBatchStreamBuilder::new(parquet_file_reader)
99        .await
100        .map_err(Into::into)
101}
102
103pub fn new_s3_operator(
104    s3_region: String,
105    s3_access_key: String,
106    s3_secret_key: String,
107    bucket: String,
108    s3_endpoint: String,
109) -> ConnectorResult<Operator> {
110    let mut builder = S3::default();
111    builder = builder
112        .region(&s3_region)
113        .endpoint(&s3_endpoint)
114        .access_key_id(&s3_access_key)
115        .secret_access_key(&s3_secret_key)
116        .bucket(&bucket)
117        .disable_config_load();
118    let op: Operator = Operator::new(builder)?
119        .layer(LoggingLayer::default())
120        .layer(RetryLayer::default())
121        .finish();
122
123    Ok(op)
124}
125
126pub fn new_gcs_operator(credential: String, bucket: String) -> ConnectorResult<Operator> {
127    // Create gcs builder.
128    let builder = Gcs::default().bucket(&bucket).credential(&credential);
129
130    let operator: Operator = Operator::new(builder)?
131        .layer(LoggingLayer::default())
132        .layer(RetryLayer::default())
133        .finish();
134    Ok(operator)
135}
136
137pub fn new_azblob_operator(
138    endpoint: String,
139    account_name: String,
140    account_key: String,
141    container_name: String,
142) -> ConnectorResult<Operator> {
143    // Create azblob builder.
144    let mut builder = Azblob::default();
145    builder = builder
146        .container(&container_name)
147        .endpoint(&endpoint)
148        .account_name(&account_name)
149        .account_key(&account_key);
150
151    let operator: Operator = Operator::new(builder)?
152        .layer(LoggingLayer::default())
153        .layer(RetryLayer::default())
154        .finish();
155    Ok(operator)
156}
157
158#[derive(Debug, Clone)]
159pub enum FileScanBackend {
160    S3,
161    Gcs,
162    Azblob,
163}
164
165pub fn extract_bucket_and_file_name(
166    location: &str,
167    file_scan_backend: &FileScanBackend,
168) -> ConnectorResult<(String, String)> {
169    let url = Url::parse(location)?;
170    let bucket = url
171        .host_str()
172        .with_context(|| format!("Invalid url: {}, missing bucket", location))?
173        .to_owned();
174    let prefix = match file_scan_backend {
175        FileScanBackend::S3 => format!("s3://{}/", bucket),
176        FileScanBackend::Gcs => format!("gcs://{}/", bucket),
177        FileScanBackend::Azblob => format!("azblob://{}/", bucket),
178    };
179    let file_name = location[prefix.len()..].to_string();
180    Ok((bucket, file_name))
181}
182
183pub async fn list_data_directory(
184    op: Operator,
185    dir: String,
186    file_scan_backend: &FileScanBackend,
187) -> Result<Vec<String>, anyhow::Error> {
188    let (bucket, file_name) = extract_bucket_and_file_name(&dir, file_scan_backend)?;
189    let prefix = match file_scan_backend {
190        FileScanBackend::S3 => format!("s3://{}/", bucket),
191        FileScanBackend::Gcs => format!("gcs://{}/", bucket),
192        FileScanBackend::Azblob => format!("azblob://{}/", bucket),
193    };
194    if dir.starts_with(&prefix) {
195        op.list(&file_name).await.map_err(Into::into).map(|list| {
196            list.into_iter()
197                .map(|entry| prefix.clone() + entry.path())
198                .collect()
199        })
200    } else {
201        bail!("Invalid url: {}, should start with {}", dir, prefix)
202    }
203}
204
205/// Extracts a suitable `ProjectionMask` from a Parquet file schema based on the user's requested schema.
206///
207/// This function is utilized for column pruning of Parquet files. It checks the user's requested schema
208/// against the schema of the currently read Parquet file. If the provided `columns` are `None`
209/// or if the Parquet file contains nested data types, it returns `ProjectionMask::all()`. Otherwise,
210/// it returns only the columns where both the data type and column name match the requested schema,
211/// facilitating efficient reading of the `RecordBatch`.
212///
213/// # Parameters
214/// - `columns`: An optional vector of `Column` representing the user's requested schema.
215/// - `metadata`: A reference to `FileMetaData` containing the schema and metadata of the Parquet file.
216///
217/// # Returns
218/// - A `ConnectorResult<ProjectionMask>`, which represents the valid columns in the Parquet file schema
219///   that correspond to the requested schema. If an error occurs during processing, it returns an
220///   appropriate error.
221pub fn get_project_mask(
222    columns: Option<Vec<Column>>,
223    metadata: &FileMetaData,
224) -> ConnectorResult<ProjectionMask> {
225    match columns {
226        Some(rw_columns) => {
227            let root_column_names = metadata
228                .schema_descr()
229                .root_schema()
230                .get_fields()
231                .iter()
232                .map(|field| field.name())
233                .collect_vec();
234
235            let converted_arrow_schema =
236                parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata())
237                    .map_err(anyhow::Error::from)?;
238            let valid_column_indices: Vec<usize> = rw_columns
239                .iter()
240                .filter_map(|column| {
241                    root_column_names
242                        .iter()
243                        .position(|&name| name == column.name)
244                        .and_then(|pos| {
245                            let arrow_data_type: &risingwave_common::array::arrow::arrow_schema_iceberg::DataType = converted_arrow_schema.field_with_name(&column.name).ok()?.data_type();
246                            let rw_data_type: &risingwave_common::types::DataType = &column.data_type;
247                            if is_parquet_schema_match_source_schema(arrow_data_type, rw_data_type) {
248                                Some(pos)
249                            } else {
250                                None
251                            }
252                        })
253                })
254                .collect();
255
256            Ok(ProjectionMask::roots(
257                metadata.schema_descr(),
258                valid_column_indices,
259            ))
260        }
261        None => Ok(ProjectionMask::all()),
262    }
263}
264
265/// Reads a specified Parquet file and converts its content into a stream of chunks.
266pub async fn read_parquet_file(
267    op: Operator,
268    file_name: String,
269    rw_columns: Option<Vec<Column>>,
270    parser_columns: Option<Vec<SourceColumnDesc>>,
271    batch_size: usize,
272    offset: usize,
273    file_source_input_row_count_metrics: Option<
274        LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
275    >,
276    parquet_source_skip_row_count_metrics: Option<
277        LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
278    >,
279) -> ConnectorResult<
280    Pin<Box<dyn Stream<Item = Result<StreamChunk, crate::error::ConnectorError>> + Send>>,
281> {
282    let mut reader: tokio_util::compat::Compat<opendal::FuturesAsyncReader> = op
283        .reader_with(&file_name)
284        .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`.
285        .await?
286        .into_futures_async_read(..)
287        .await?
288        .compat();
289    let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?;
290
291    let file_metadata = parquet_metadata.file_metadata();
292    let projection_mask = get_project_mask(rw_columns, file_metadata)?;
293
294    // For the Parquet format, we directly convert from a record batch to a stream chunk.
295    // Therefore, the offset of the Parquet file represents the current position in terms of the number of rows read from the file.
296    let record_batch_stream = ParquetRecordBatchStreamBuilder::new(reader)
297        .await?
298        .with_batch_size(batch_size)
299        .with_projection(projection_mask)
300        .with_offset(offset)
301        .build()?;
302    let converted_arrow_schema = parquet_to_arrow_schema(
303        file_metadata.schema_descr(),
304        file_metadata.key_value_metadata(),
305    )?;
306    let columns = match parser_columns {
307        Some(columns) => columns,
308        None => converted_arrow_schema
309            .fields
310            .iter()
311            .enumerate()
312            .map(|(index, field_ref)| {
313                let data_type = IcebergArrowConvert.type_from_field(field_ref).unwrap();
314                let column_desc = ColumnDesc::named(
315                    field_ref.name().clone(),
316                    ColumnId::new(index as i32),
317                    data_type,
318                );
319                SourceColumnDesc::from(&column_desc)
320            })
321            .collect(),
322    };
323    let parquet_parser = ParquetParser::new(columns, file_name, offset)?;
324    let msg_stream: Pin<
325        Box<dyn Stream<Item = Result<StreamChunk, crate::error::ConnectorError>> + Send>,
326    > = parquet_parser.into_stream(
327        record_batch_stream,
328        file_source_input_row_count_metrics,
329        parquet_source_skip_row_count_metrics,
330    );
331    Ok(msg_stream)
332}
333
334pub async fn get_parquet_fields(
335    op: Operator,
336    file_name: String,
337) -> ConnectorResult<risingwave_common::array::arrow::arrow_schema_iceberg::Fields> {
338    let mut reader: tokio_util::compat::Compat<opendal::FuturesAsyncReader> = op
339        .reader_with(&file_name)
340        .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`.
341        .await?
342        .into_futures_async_read(..)
343        .await?
344        .compat();
345    let parquet_metadata = reader.get_metadata().await?;
346
347    let file_metadata = parquet_metadata.file_metadata();
348    let converted_arrow_schema = parquet_to_arrow_schema(
349        file_metadata.schema_descr(),
350        file_metadata.key_value_metadata(),
351    )?;
352    let fields: risingwave_common::array::arrow::arrow_schema_iceberg::Fields =
353        converted_arrow_schema.fields;
354    Ok(fields)
355}