risingwave_connector/parser/
parquet_parser.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::sync::Arc;
15
16use futures_async_stream::try_stream;
17use prometheus::core::GenericCounter;
18use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch;
19use risingwave_common::array::arrow::{IcebergArrowConvert, is_parquet_schema_match_source_schema};
20use risingwave_common::array::{ArrayBuilderImpl, DataChunk, StreamChunk};
21use risingwave_common::metrics::LabelGuardedMetric;
22use risingwave_common::types::{Datum, ScalarImpl};
23use thiserror_ext::AsReport;
24
25use crate::parser::ConnectorResult;
26use crate::source::SourceColumnDesc;
27/// `ParquetParser` is responsible for converting the incoming `record_batch_stream`
28/// into a `streamChunk`.
29#[derive(Debug)]
30pub struct ParquetParser {
31    rw_columns: Vec<SourceColumnDesc>,
32    file_name: String,
33    offset: usize,
34}
35
36impl ParquetParser {
37    pub fn new(
38        rw_columns: Vec<SourceColumnDesc>,
39        file_name: String,
40        offset: usize,
41    ) -> ConnectorResult<Self> {
42        Ok(Self {
43            rw_columns,
44            file_name,
45            offset,
46        })
47    }
48
49    #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
50    pub async fn into_stream(
51        mut self,
52        record_batch_stream: parquet::arrow::async_reader::ParquetRecordBatchStream<
53            tokio_util::compat::Compat<opendal::FuturesAsyncReader>,
54        >,
55        file_source_input_row_count_metrics: Option<
56            LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
57        >,
58        parquet_source_skip_row_count_metrics: Option<
59            LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
60        >,
61    ) {
62        #[for_await]
63        for record_batch in record_batch_stream {
64            let record_batch: RecordBatch = record_batch?;
65            // Convert each record batch into a stream chunk according to user defined schema.
66            let chunk: StreamChunk = self.convert_record_batch_to_stream_chunk(
67                record_batch,
68                file_source_input_row_count_metrics.clone(),
69                parquet_source_skip_row_count_metrics.clone(),
70            )?;
71
72            yield chunk;
73        }
74    }
75
76    fn inc_offset(&mut self) {
77        self.offset += 1;
78    }
79
80    /// The function `convert_record_batch_to_stream_chunk` is designed to transform the given `RecordBatch` into a `StreamChunk`.
81    ///
82    /// For each column in the source column:
83    /// - If the column's schema matches a column in the `RecordBatch` (both the data type and column name are the same),
84    ///   the corresponding records are converted into a column of the `StreamChunk`.
85    /// - If the column's schema does not match, null values are inserted.
86    /// - Hidden columns are handled separately by filling in the appropriate fields to ensure the data chunk maintains the correct format.
87    /// - If a column in the Parquet file does not exist in the source schema, it is skipped.
88    ///
89    /// # Arguments
90    ///
91    /// * `record_batch` - The `RecordBatch` to be converted into a `StreamChunk`.
92    ///
93    /// # Returns
94    ///
95    /// A `StreamChunk` containing the converted data from the `RecordBatch`.
96    ///
97    /// The hidden columns that must be included here are `_rw_file` and `_rw_offset`.
98    /// Depending on whether the user specifies a primary key (pk), there may be an additional hidden column `row_id`.
99    /// Therefore, the maximum number of hidden columns is three.
100    fn convert_record_batch_to_stream_chunk(
101        &mut self,
102        record_batch: RecordBatch,
103        file_source_input_row_count_metrics: Option<
104            LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
105        >,
106        parquet_source_skip_row_count_metrics: Option<
107            LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
108        >,
109    ) -> Result<StreamChunk, crate::error::ConnectorError> {
110        const MAX_HIDDEN_COLUMN_NUMS: usize = 3;
111        let column_size = self.rw_columns.len();
112        let mut chunk_columns = Vec::with_capacity(self.rw_columns.len() + MAX_HIDDEN_COLUMN_NUMS);
113
114        for source_column in self.rw_columns.clone() {
115            match source_column.column_type {
116                crate::source::SourceColumnType::Normal => {
117                    let rw_data_type: &risingwave_common::types::DataType =
118                        &source_column.data_type;
119                    let rw_column_name = &source_column.name;
120                    if let Some(parquet_column) = record_batch.column_by_name(rw_column_name)
121                        && is_parquet_schema_match_source_schema(
122                            parquet_column.data_type(),
123                            rw_data_type,
124                        )
125                    {
126                        let arrow_field = IcebergArrowConvert
127                            .to_arrow_field(rw_column_name, rw_data_type)
128                            .map_err(|e| {
129                                crate::parser::AccessError::ParquetParser {
130                                    message: format!(
131                                        "to_arrow_field failed, column='{}', rw_type='{}', offset={}, error={}",
132                                        rw_column_name, rw_data_type, self.offset, e.as_report()
133                                    )
134                                }
135                            })?;
136                        let array_impl = IcebergArrowConvert
137                            .array_from_arrow_array(&arrow_field, parquet_column)
138                            .map_err(|e| {
139                                crate::parser::AccessError::ParquetParser {
140                                    message: format!(
141                                        "array_from_arrow_array failed, column='{}', rw_type='{}', arrow_field='{}', parquet_type='{}', offset={}, error={}",
142                                        rw_column_name,
143                                        rw_data_type,
144                                        arrow_field.data_type(),
145                                        parquet_column.data_type(),
146                                        self.offset,
147                                        e.as_report()
148                                    )
149                                }
150                            })?;
151                        chunk_columns.push(Arc::new(array_impl));
152                    } else {
153                        // Handle additional columns, for file source, the additional columns are offset and file name;
154                        // for columns defined in the user schema but not present in the parquet file, fill with null.
155                        let column = if let Some(additional_column_type) =
156                            &source_column.additional_column.column_type
157                        {
158                            match additional_column_type {
159                                risingwave_pb::plan_common::additional_column::ColumnType::Offset(_) => {
160                                    let mut array_builder = ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
161                                    for _ in 0..record_batch.num_rows() {
162                                        let datum: Datum = Some(ScalarImpl::Utf8((self.offset).to_string().into()));
163                                        self.inc_offset();
164                                        array_builder.append(datum);
165                                    }
166                                    Arc::new(array_builder.finish())
167                                }
168                                risingwave_pb::plan_common::additional_column::ColumnType::Filename(_) => {
169                                    let mut array_builder = ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
170                                    let datum: Datum = Some(ScalarImpl::Utf8(self.file_name.clone().into()));
171                                    array_builder.append_n(record_batch.num_rows(), datum);
172                                    Arc::new(array_builder.finish())
173                                }
174                                _ => unreachable!(),
175                            }
176                        } else {
177                            // For columns defined in the source schema but not present in the Parquet file, null values are filled in.
178                            let mut array_builder =
179                                ArrayBuilderImpl::with_type(column_size, rw_data_type.clone());
180                            array_builder.append_n_null(record_batch.num_rows());
181                            if let Some(metrics) = parquet_source_skip_row_count_metrics.clone() {
182                                metrics.inc_by(record_batch.num_rows() as u64);
183                            }
184                            Arc::new(array_builder.finish())
185                        };
186                        chunk_columns.push(column);
187                    }
188                }
189                crate::source::SourceColumnType::RowId => {
190                    let mut array_builder =
191                        ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
192                    let datum: Datum = None;
193                    array_builder.append_n(record_batch.num_rows(), datum);
194                    let res = array_builder.finish();
195                    let column = Arc::new(res);
196                    chunk_columns.push(column);
197                }
198                // The following fields are only used in CDC source
199                crate::source::SourceColumnType::Offset | crate::source::SourceColumnType::Meta => {
200                    unreachable!()
201                }
202            }
203        }
204        if let Some(metrics) = file_source_input_row_count_metrics {
205            metrics.inc_by(record_batch.num_rows() as u64);
206        }
207
208        let data_chunk = DataChunk::new(chunk_columns.clone(), record_batch.num_rows());
209        Ok(data_chunk.into())
210    }
211}