risingwave_connector/parser/
parquet_parser.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::sync::Arc;
15
16use futures_async_stream::try_stream;
17use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch;
18use risingwave_common::array::arrow::{IcebergArrowConvert, is_parquet_schema_match_source_schema};
19use risingwave_common::array::{ArrayBuilderImpl, DataChunk, StreamChunk};
20use risingwave_common::types::{Datum, ScalarImpl};
21
22use crate::parser::ConnectorResult;
23use crate::source::SourceColumnDesc;
24/// `ParquetParser` is responsible for converting the incoming `record_batch_stream`
25/// into a `streamChunk`.
26#[derive(Debug)]
27pub struct ParquetParser {
28    rw_columns: Vec<SourceColumnDesc>,
29    file_name: String,
30    offset: usize,
31}
32
33impl ParquetParser {
34    pub fn new(
35        rw_columns: Vec<SourceColumnDesc>,
36        file_name: String,
37        offset: usize,
38    ) -> ConnectorResult<Self> {
39        Ok(Self {
40            rw_columns,
41            file_name,
42            offset,
43        })
44    }
45
46    #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
47    pub async fn into_stream(
48        mut self,
49        record_batch_stream: parquet::arrow::async_reader::ParquetRecordBatchStream<
50            tokio_util::compat::Compat<opendal::FuturesAsyncReader>,
51        >,
52        file_source_input_row_count_metrics: Option<
53            risingwave_common::metrics::LabelGuardedMetric<
54                prometheus::core::GenericCounter<prometheus::core::AtomicU64>,
55                4,
56            >,
57        >,
58        parquet_source_skip_row_count_metrics: Option<
59            risingwave_common::metrics::LabelGuardedMetric<
60                prometheus::core::GenericCounter<prometheus::core::AtomicU64>,
61                4,
62            >,
63        >,
64    ) {
65        #[for_await]
66        for record_batch in record_batch_stream {
67            let record_batch: RecordBatch = record_batch?;
68            // Convert each record batch into a stream chunk according to user defined schema.
69            let chunk: StreamChunk = self.convert_record_batch_to_stream_chunk(
70                record_batch,
71                file_source_input_row_count_metrics.clone(),
72                parquet_source_skip_row_count_metrics.clone(),
73            )?;
74
75            yield chunk;
76        }
77    }
78
79    fn inc_offset(&mut self) {
80        self.offset += 1;
81    }
82
83    /// The function `convert_record_batch_to_stream_chunk` is designed to transform the given `RecordBatch` into a `StreamChunk`.
84    ///
85    /// For each column in the source column:
86    /// - If the column's schema matches a column in the `RecordBatch` (both the data type and column name are the same),
87    ///   the corresponding records are converted into a column of the `StreamChunk`.
88    /// - If the column's schema does not match, null values are inserted.
89    /// - Hidden columns are handled separately by filling in the appropriate fields to ensure the data chunk maintains the correct format.
90    /// - If a column in the Parquet file does not exist in the source schema, it is skipped.
91    ///
92    /// # Arguments
93    ///
94    /// * `record_batch` - The `RecordBatch` to be converted into a `StreamChunk`.
95    ///
96    /// # Returns
97    ///
98    /// A `StreamChunk` containing the converted data from the `RecordBatch`.
99    ///
100    /// The hidden columns that must be included here are `_rw_file` and `_rw_offset`.
101    /// Depending on whether the user specifies a primary key (pk), there may be an additional hidden column `row_id`.
102    /// Therefore, the maximum number of hidden columns is three.
103    fn convert_record_batch_to_stream_chunk(
104        &mut self,
105        record_batch: RecordBatch,
106        file_source_input_row_count_metrics: Option<
107            risingwave_common::metrics::LabelGuardedMetric<
108                prometheus::core::GenericCounter<prometheus::core::AtomicU64>,
109                4,
110            >,
111        >,
112        parquet_source_skip_row_count_metrics: Option<
113            risingwave_common::metrics::LabelGuardedMetric<
114                prometheus::core::GenericCounter<prometheus::core::AtomicU64>,
115                4,
116            >,
117        >,
118    ) -> Result<StreamChunk, crate::error::ConnectorError> {
119        const MAX_HIDDEN_COLUMN_NUMS: usize = 3;
120        let column_size = self.rw_columns.len();
121        let mut chunk_columns = Vec::with_capacity(self.rw_columns.len() + MAX_HIDDEN_COLUMN_NUMS);
122
123        for source_column in self.rw_columns.clone() {
124            match source_column.column_type {
125                crate::source::SourceColumnType::Normal => {
126                    let rw_data_type: &risingwave_common::types::DataType =
127                        &source_column.data_type;
128                    let rw_column_name = &source_column.name;
129
130                    if let Some(parquet_column) = record_batch.column_by_name(rw_column_name)
131                        && is_parquet_schema_match_source_schema(
132                            parquet_column.data_type(),
133                            rw_data_type,
134                        )
135                    {
136                        let arrow_field =
137                            IcebergArrowConvert.to_arrow_field(rw_column_name, rw_data_type)?;
138                        let array_impl = IcebergArrowConvert
139                            .array_from_arrow_array(&arrow_field, parquet_column)?;
140                        chunk_columns.push(Arc::new(array_impl));
141                    } else {
142                        // Handle additional columns, for file source, the additional columns are offset and file name;
143                        // for columns defined in the user schema but not present in the parquet file, fill with null.
144                        let column = if let Some(additional_column_type) =
145                            &source_column.additional_column.column_type
146                        {
147                            match additional_column_type {
148                                risingwave_pb::plan_common::additional_column::ColumnType::Offset(_) => {
149                                    let mut array_builder = ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
150                                    for _ in 0..record_batch.num_rows() {
151                                        let datum: Datum = Some(ScalarImpl::Utf8((self.offset).to_string().into()));
152                                        self.inc_offset();
153                                        array_builder.append(datum);
154                                    }
155                                    Arc::new(array_builder.finish())
156                                }
157                                risingwave_pb::plan_common::additional_column::ColumnType::Filename(_) => {
158                                    let mut array_builder = ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
159                                    let datum: Datum = Some(ScalarImpl::Utf8(self.file_name.clone().into()));
160                                    array_builder.append_n(record_batch.num_rows(), datum);
161                                    Arc::new(array_builder.finish())
162                                }
163                                _ => unreachable!(),
164                            }
165                        } else {
166                            // For columns defined in the source schema but not present in the Parquet file, null values are filled in.
167                            let mut array_builder =
168                                ArrayBuilderImpl::with_type(column_size, rw_data_type.clone());
169                            array_builder.append_n_null(record_batch.num_rows());
170                            if let Some(metrics) = parquet_source_skip_row_count_metrics.clone() {
171                                metrics.inc_by(record_batch.num_rows() as u64);
172                            }
173                            Arc::new(array_builder.finish())
174                        };
175                        chunk_columns.push(column);
176                    }
177                }
178                crate::source::SourceColumnType::RowId => {
179                    let mut array_builder =
180                        ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
181                    let datum: Datum = None;
182                    array_builder.append_n(record_batch.num_rows(), datum);
183                    let res = array_builder.finish();
184                    let column = Arc::new(res);
185                    chunk_columns.push(column);
186                }
187                // The following fields are only used in CDC source
188                crate::source::SourceColumnType::Offset | crate::source::SourceColumnType::Meta => {
189                    unreachable!()
190                }
191            }
192        }
193        if let Some(metrics) = file_source_input_row_count_metrics {
194            metrics.inc_by(record_batch.num_rows() as u64);
195        }
196
197        let data_chunk = DataChunk::new(chunk_columns.clone(), record_batch.num_rows());
198        Ok(data_chunk.into())
199    }
200}