risingwave_connector/source/iceberg/
parquet_file_handler.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::future::IntoFuture;
17use std::ops::Range;
18use std::pin::Pin;
19use std::sync::Arc;
20
21use anyhow::{Context, bail};
22use bytes::Bytes;
23use futures::future::BoxFuture;
24use futures::{FutureExt, Stream, TryFutureExt};
25use iceberg::io::{
26    FileIOBuilder, FileMetadata, FileRead, S3_ACCESS_KEY_ID, S3_REGION, S3_SECRET_ACCESS_KEY,
27};
28use itertools::Itertools;
29use opendal::Operator;
30use opendal::layers::{LoggingLayer, RetryLayer};
31use opendal::services::{Azblob, Gcs, S3};
32use parquet::arrow::async_reader::AsyncFileReader;
33use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, parquet_to_arrow_schema};
34use parquet::file::metadata::{FileMetaData, ParquetMetaData, ParquetMetaDataReader};
35use prometheus::core::GenericCounter;
36use risingwave_common::array::StreamChunk;
37use risingwave_common::array::arrow::{IcebergArrowConvert, is_parquet_schema_match_source_schema};
38use risingwave_common::catalog::{ColumnDesc, ColumnId};
39use risingwave_common::metrics::LabelGuardedMetric;
40use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt;
41use url::Url;
42
43use crate::error::ConnectorResult;
44use crate::parser::ParquetParser;
45use crate::source::{Column, SourceColumnDesc};
46
47pub struct ParquetFileReader<R: FileRead> {
48    meta: FileMetadata,
49    r: R,
50}
51
52impl<R: FileRead> ParquetFileReader<R> {
53    pub fn new(meta: FileMetadata, r: R) -> Self {
54        Self { meta, r }
55    }
56}
57
58impl<R: FileRead> AsyncFileReader for ParquetFileReader<R> {
59    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
60        Box::pin(
61            self.r
62                .read(range.start as _..range.end as _)
63                .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
64        )
65    }
66
67    fn get_metadata(
68        &mut self,
69        _options: Option<&parquet::arrow::arrow_reader::ArrowReaderOptions>,
70    ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
71        async move {
72            let reader = ParquetMetaDataReader::new();
73            let size = self.meta.size;
74            let meta = reader.load_and_finish(self, size).await?;
75
76            Ok(Arc::new(meta))
77        }
78        .boxed()
79    }
80}
81
82pub async fn create_parquet_stream_builder(
83    s3_region: String,
84    s3_access_key: String,
85    s3_secret_key: String,
86    location: String,
87) -> Result<ParquetRecordBatchStreamBuilder<ParquetFileReader<impl FileRead>>, anyhow::Error> {
88    let mut props = HashMap::new();
89    props.insert(S3_REGION, s3_region.clone());
90    props.insert(S3_ACCESS_KEY_ID, s3_access_key.clone());
91    props.insert(S3_SECRET_ACCESS_KEY, s3_secret_key.clone());
92
93    let file_io_builder = FileIOBuilder::new("s3");
94    let file_io = file_io_builder.with_props(props.into_iter()).build()?;
95    let parquet_file = file_io.new_input(&location)?;
96
97    let parquet_metadata = parquet_file.metadata().await?;
98    let parquet_reader = parquet_file.reader().await?;
99    let parquet_file_reader = ParquetFileReader::new(parquet_metadata, parquet_reader);
100
101    ParquetRecordBatchStreamBuilder::new(parquet_file_reader)
102        .await
103        .map_err(Into::into)
104}
105
106pub fn new_s3_operator(
107    s3_region: String,
108    s3_access_key: String,
109    s3_secret_key: String,
110    bucket: String,
111    s3_endpoint: String,
112) -> ConnectorResult<Operator> {
113    let mut builder = S3::default();
114    builder = builder
115        .region(&s3_region)
116        .endpoint(&s3_endpoint)
117        .access_key_id(&s3_access_key)
118        .secret_access_key(&s3_secret_key)
119        .bucket(&bucket)
120        .disable_config_load();
121    let op: Operator = Operator::new(builder)?
122        .layer(LoggingLayer::default())
123        .layer(RetryLayer::default())
124        .finish();
125
126    Ok(op)
127}
128
129pub fn new_gcs_operator(credential: String, bucket: String) -> ConnectorResult<Operator> {
130    // Create gcs builder.
131    let builder = Gcs::default().bucket(&bucket).credential(&credential);
132
133    let operator: Operator = Operator::new(builder)?
134        .layer(LoggingLayer::default())
135        .layer(RetryLayer::default())
136        .finish();
137    Ok(operator)
138}
139
140pub fn new_azblob_operator(
141    endpoint: String,
142    account_name: String,
143    account_key: String,
144    container_name: String,
145) -> ConnectorResult<Operator> {
146    // Create azblob builder.
147    let mut builder = Azblob::default();
148    builder = builder
149        .container(&container_name)
150        .endpoint(&endpoint)
151        .account_name(&account_name)
152        .account_key(&account_key);
153
154    let operator: Operator = Operator::new(builder)?
155        .layer(LoggingLayer::default())
156        .layer(RetryLayer::default())
157        .finish();
158    Ok(operator)
159}
160
161#[derive(Debug, Clone)]
162pub enum FileScanBackend {
163    S3,
164    Gcs,
165    Azblob,
166}
167
168pub fn extract_bucket_and_file_name(
169    location: &str,
170    file_scan_backend: &FileScanBackend,
171) -> ConnectorResult<(String, String)> {
172    let url = Url::parse(location)?;
173    let bucket = url
174        .host_str()
175        .with_context(|| format!("Invalid url: {}, missing bucket", location))?
176        .to_owned();
177    let prefix = match file_scan_backend {
178        FileScanBackend::S3 => format!("s3://{}/", bucket),
179        FileScanBackend::Gcs => format!("gcs://{}/", bucket),
180        FileScanBackend::Azblob => format!("azblob://{}/", bucket),
181    };
182    let file_name = location[prefix.len()..].to_string();
183    Ok((bucket, file_name))
184}
185
186pub async fn list_data_directory(
187    op: Operator,
188    dir: String,
189    file_scan_backend: &FileScanBackend,
190) -> Result<Vec<String>, anyhow::Error> {
191    let (bucket, file_name) = extract_bucket_and_file_name(&dir, file_scan_backend)?;
192    let prefix = match file_scan_backend {
193        FileScanBackend::S3 => format!("s3://{}/", bucket),
194        FileScanBackend::Gcs => format!("gcs://{}/", bucket),
195        FileScanBackend::Azblob => format!("azblob://{}/", bucket),
196    };
197    if dir.starts_with(&prefix) {
198        op.list(&file_name).await.map_err(Into::into).map(|list| {
199            list.into_iter()
200                .map(|entry| prefix.clone() + entry.path())
201                .collect()
202        })
203    } else {
204        bail!("Invalid url: {}, should start with {}", dir, prefix)
205    }
206}
207
208/// Extracts a suitable `ProjectionMask` from a Parquet file schema based on the user's requested schema.
209///
210/// This function is utilized for column pruning of Parquet files. It checks the user's requested schema
211/// against the schema of the currently read Parquet file. If the provided `columns` are `None`
212/// or if the Parquet file contains nested data types, it returns `ProjectionMask::all()`. Otherwise,
213/// it returns only the columns where both the data type and column name match the requested schema,
214/// facilitating efficient reading of the `RecordBatch`.
215///
216/// # Parameters
217/// - `columns`: An optional vector of `Column` representing the user's requested schema.
218/// - `metadata`: A reference to `FileMetaData` containing the schema and metadata of the Parquet file.
219///
220/// # Returns
221/// - A `ConnectorResult<ProjectionMask>`, which represents the valid columns in the Parquet file schema
222///   that correspond to the requested schema. If an error occurs during processing, it returns an
223///   appropriate error.
224pub fn get_project_mask(
225    columns: Option<Vec<Column>>,
226    metadata: &FileMetaData,
227    case_insensitive: bool,
228) -> ConnectorResult<ProjectionMask> {
229    match columns {
230        Some(rw_columns) => {
231            let root_column_names = metadata
232                .schema_descr()
233                .root_schema()
234                .get_fields()
235                .iter()
236                .map(|field| field.name())
237                .collect_vec();
238
239            let converted_arrow_schema =
240                parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata())
241                    .map_err(anyhow::Error::from)?;
242            let mut lowercase_name_to_index: HashMap<String, Option<usize>> = HashMap::new();
243            if case_insensitive {
244                for (index, name) in root_column_names.iter().enumerate() {
245                    let key = name.to_ascii_lowercase();
246                    match lowercase_name_to_index.get(&key) {
247                        Some(Some(_)) => {
248                            lowercase_name_to_index.insert(key, None);
249                        }
250                        Some(None) => {}
251                        None => {
252                            lowercase_name_to_index.insert(key, Some(index));
253                        }
254                    }
255                }
256            }
257
258            let valid_column_indices: Vec<usize> = rw_columns
259                .iter()
260                .filter_map(|column| {
261                    let exact_pos = root_column_names
262                        .iter()
263                        .position(|&name| name == column.name);
264                    let pos = match (case_insensitive, exact_pos) {
265                        (_, Some(pos)) => Some(pos),
266                        (true, None) => lowercase_name_to_index
267                            .get(&column.name.to_ascii_lowercase())
268                            .copied()
269                            .flatten(),
270                        _ => None,
271                    }?;
272                    let arrow_field = converted_arrow_schema.fields.get(pos)?;
273                    let arrow_data_type = arrow_field.data_type();
274                    let rw_data_type: &risingwave_common::types::DataType = &column.data_type;
275                    if is_parquet_schema_match_source_schema(arrow_data_type, rw_data_type) {
276                        Some(pos)
277                    } else {
278                        None
279                    }
280                })
281                .collect();
282
283            Ok(ProjectionMask::roots(
284                metadata.schema_descr(),
285                valid_column_indices,
286            ))
287        }
288        None => Ok(ProjectionMask::all()),
289    }
290}
291
292/// Reads a specified Parquet file and converts its content into a stream of chunks.
293pub async fn read_parquet_file(
294    op: Operator,
295    file_name: String,
296    rw_columns: Option<Vec<Column>>,
297    parser_columns: Option<Vec<SourceColumnDesc>>,
298    case_insensitive: bool,
299    batch_size: usize,
300    offset: usize,
301    file_source_input_row_count_metrics: Option<
302        LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
303    >,
304    parquet_source_skip_row_count_metrics: Option<
305        LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
306    >,
307) -> ConnectorResult<
308    Pin<Box<dyn Stream<Item = Result<StreamChunk, crate::error::ConnectorError>> + Send>>,
309> {
310    let mut reader: tokio_util::compat::Compat<opendal::FuturesAsyncReader> = op
311        .reader_with(&file_name)
312        .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`.
313        .await?
314        .into_futures_async_read(..)
315        .await?
316        .compat();
317    let parquet_metadata = reader
318        .get_metadata(None)
319        .await
320        .map_err(anyhow::Error::from)?;
321
322    let file_metadata = parquet_metadata.file_metadata();
323    {
324        // Log parquet file-level metadata
325        tracing::info!(
326            "Reading parquet file: {}, from offset {}, num_row_groups={}, total_rows={}, kv_len={}",
327            file_name,
328            offset,
329            parquet_metadata.row_groups().len(),
330            file_metadata.num_rows(),
331            file_metadata
332                .key_value_metadata()
333                .map(|m| m.len())
334                .unwrap_or(0)
335        );
336        // Log each leaf column's path and types
337        let schema_descr = file_metadata.schema_descr();
338        for col in schema_descr.columns() {
339            let path = col.path().string();
340            let physical = col.physical_type();
341            let logical = col.logical_type();
342            tracing::debug!(
343                file = %file_name,
344                column_path = path,
345                physical_type = ?physical,
346                logical_type = ?logical,
347                type_length = ?col.type_length(),
348                max_def_level = col.max_def_level(),
349                max_rep_level = col.max_rep_level(),
350                "Parquet file column schema: "
351            );
352        }
353    }
354    let projection_mask = get_project_mask(rw_columns, file_metadata, case_insensitive)?;
355
356    // For the Parquet format, we directly convert from a record batch to a stream chunk.
357    // Therefore, the offset of the Parquet file represents the current position in terms of the number of rows read from the file.
358    let record_batch_stream = ParquetRecordBatchStreamBuilder::new(reader)
359        .await?
360        .with_batch_size(batch_size)
361        .with_projection(projection_mask)
362        .with_offset(offset)
363        .build()?;
364    let converted_arrow_schema = parquet_to_arrow_schema(
365        file_metadata.schema_descr(),
366        file_metadata.key_value_metadata(),
367    )?;
368    let columns = match parser_columns {
369        Some(columns) => columns,
370        None => converted_arrow_schema
371            .fields
372            .iter()
373            .enumerate()
374            .map(|(index, field_ref)| {
375                let data_type = IcebergArrowConvert.type_from_field(field_ref).unwrap();
376                let column_desc = ColumnDesc::named(
377                    field_ref.name().clone(),
378                    ColumnId::new(index as i32),
379                    data_type,
380                );
381                SourceColumnDesc::from(&column_desc)
382            })
383            .collect(),
384    };
385    let parquet_parser = ParquetParser::new(columns, file_name, offset, case_insensitive)?;
386    let msg_stream: Pin<
387        Box<dyn Stream<Item = Result<StreamChunk, crate::error::ConnectorError>> + Send>,
388    > = parquet_parser.into_stream(
389        record_batch_stream,
390        file_source_input_row_count_metrics,
391        parquet_source_skip_row_count_metrics,
392    );
393    Ok(msg_stream)
394}
395
396pub async fn get_parquet_fields(
397    op: Operator,
398    file_name: String,
399) -> ConnectorResult<risingwave_common::array::arrow::arrow_schema_iceberg::Fields> {
400    let mut reader: tokio_util::compat::Compat<opendal::FuturesAsyncReader> = op
401        .reader_with(&file_name)
402        .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`.
403        .await?
404        .into_futures_async_read(..)
405        .await?
406        .compat();
407    let parquet_metadata = reader.get_metadata(None).await?;
408
409    let file_metadata = parquet_metadata.file_metadata();
410    let converted_arrow_schema = parquet_to_arrow_schema(
411        file_metadata.schema_descr(),
412        file_metadata.key_value_metadata(),
413    )?;
414    let fields: risingwave_common::array::arrow::arrow_schema_iceberg::Fields =
415        converted_arrow_schema.fields;
416    Ok(fields)
417}