risingwave_connector/parser/
parquet_parser.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures_async_stream::try_stream;
18use prometheus::core::GenericCounter;
19use risingwave_common::array::arrow::arrow_array_iceberg::{ArrayRef, RecordBatch};
20use risingwave_common::array::arrow::{IcebergArrowConvert, is_parquet_schema_match_source_schema};
21use risingwave_common::array::{ArrayBuilderImpl, DataChunk, StreamChunk};
22use risingwave_common::metrics::LabelGuardedMetric;
23use risingwave_common::types::{Datum, ScalarImpl};
24use thiserror_ext::AsReport;
25
26use crate::parser::ConnectorResult;
27use crate::source::SourceColumnDesc;
28/// `ParquetParser` is responsible for converting the incoming `record_batch_stream`
29/// into a `streamChunk`.
30#[derive(Debug)]
31pub struct ParquetParser {
32    rw_columns: Vec<SourceColumnDesc>,
33    file_name: String,
34    offset: usize,
35    case_insensitive: bool,
36}
37
38impl ParquetParser {
39    pub fn new(
40        rw_columns: Vec<SourceColumnDesc>,
41        file_name: String,
42        offset: usize,
43        case_insensitive: bool,
44    ) -> ConnectorResult<Self> {
45        Ok(Self {
46            rw_columns,
47            file_name,
48            offset,
49            case_insensitive,
50        })
51    }
52
53    #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
54    pub async fn into_stream(
55        mut self,
56        record_batch_stream: parquet::arrow::async_reader::ParquetRecordBatchStream<
57            tokio_util::compat::Compat<opendal::FuturesAsyncReader>,
58        >,
59        file_source_input_row_count_metrics: Option<
60            LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
61        >,
62        parquet_source_skip_row_count_metrics: Option<
63            LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
64        >,
65    ) {
66        #[for_await]
67        for record_batch in record_batch_stream {
68            let record_batch: RecordBatch = record_batch?;
69            // Convert each record batch into a stream chunk according to user defined schema.
70            let chunk: StreamChunk = self.convert_record_batch_to_stream_chunk(
71                record_batch,
72                file_source_input_row_count_metrics.clone(),
73                parquet_source_skip_row_count_metrics.clone(),
74            )?;
75
76            yield chunk;
77        }
78    }
79
80    fn inc_offset(&mut self) {
81        self.offset += 1;
82    }
83
84    /// The function `convert_record_batch_to_stream_chunk` is designed to transform the given `RecordBatch` into a `StreamChunk`.
85    ///
86    /// For each column in the source column:
87    /// - If the column's schema matches a column in the `RecordBatch` (both the data type and column name are the same),
88    ///   the corresponding records are converted into a column of the `StreamChunk`.
89    /// - If the column's schema does not match, null values are inserted.
90    /// - Hidden columns are handled separately by filling in the appropriate fields to ensure the data chunk maintains the correct format.
91    /// - If a column in the Parquet file does not exist in the source schema, it is skipped.
92    ///
93    /// # Arguments
94    ///
95    /// * `record_batch` - The `RecordBatch` to be converted into a `StreamChunk`.
96    ///
97    /// # Returns
98    ///
99    /// A `StreamChunk` containing the converted data from the `RecordBatch`.
100    ///
101    /// The hidden columns that must be included here are `_rw_file` and `_rw_offset`.
102    /// Depending on whether the user specifies a primary key (pk), there may be an additional hidden column `row_id`.
103    /// Therefore, the maximum number of hidden columns is three.
104    fn convert_record_batch_to_stream_chunk(
105        &mut self,
106        record_batch: RecordBatch,
107        file_source_input_row_count_metrics: Option<
108            LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
109        >,
110        parquet_source_skip_row_count_metrics: Option<
111            LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
112        >,
113    ) -> Result<StreamChunk, crate::error::ConnectorError> {
114        const MAX_HIDDEN_COLUMN_NUMS: usize = 3;
115        let column_size = self.rw_columns.len();
116        let mut chunk_columns = Vec::with_capacity(self.rw_columns.len() + MAX_HIDDEN_COLUMN_NUMS);
117
118        for source_column in self.rw_columns.clone() {
119            match source_column.column_type {
120                crate::source::SourceColumnType::Normal => {
121                    let rw_data_type: &risingwave_common::types::DataType =
122                        &source_column.data_type;
123                    let rw_column_name = &source_column.name;
124                    if let Some(parquet_column) =
125                        self.find_parquet_column(&record_batch, rw_column_name)
126                        && is_parquet_schema_match_source_schema(
127                            parquet_column.data_type(),
128                            rw_data_type,
129                        )
130                    {
131                        let arrow_field = IcebergArrowConvert
132                            .to_arrow_field(rw_column_name, rw_data_type)
133                            .map_err(|e| {
134                                crate::parser::AccessError::ParquetParser {
135                                    message: format!(
136                                        "to_arrow_field failed, column='{}', rw_type='{}', offset={}, error={}",
137                                        rw_column_name, rw_data_type, self.offset, e.as_report()
138                                    )
139                                }
140                            })?;
141                        let array_impl = IcebergArrowConvert
142                            .array_from_arrow_array(&arrow_field, parquet_column)
143                            .map_err(|e| {
144                                crate::parser::AccessError::ParquetParser {
145                                    message: format!(
146                                        "array_from_arrow_array failed, column='{}', rw_type='{}', arrow_field='{}', parquet_type='{}', offset={}, error={}",
147                                        rw_column_name,
148                                        rw_data_type,
149                                        arrow_field.data_type(),
150                                        parquet_column.data_type(),
151                                        self.offset,
152                                        e.as_report()
153                                    )
154                                }
155                            })?;
156                        chunk_columns.push(Arc::new(array_impl));
157                    } else {
158                        // Handle additional columns, for file source, the additional columns are offset and file name;
159                        // for columns defined in the user schema but not present in the parquet file, fill with null.
160                        let column = if let Some(additional_column_type) =
161                            &source_column.additional_column.column_type
162                        {
163                            match additional_column_type {
164                                risingwave_pb::plan_common::additional_column::ColumnType::Offset(_) => {
165                                    let mut array_builder = ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
166                                    for _ in 0..record_batch.num_rows() {
167                                        let datum: Datum = Some(ScalarImpl::Utf8((self.offset).to_string().into()));
168                                        self.inc_offset();
169                                        array_builder.append(datum);
170                                    }
171                                    Arc::new(array_builder.finish())
172                                }
173                                risingwave_pb::plan_common::additional_column::ColumnType::Filename(_) => {
174                                    let mut array_builder = ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
175                                    let datum: Datum = Some(ScalarImpl::Utf8(self.file_name.clone().into()));
176                                    array_builder.append_n(record_batch.num_rows(), datum);
177                                    Arc::new(array_builder.finish())
178                                }
179                                _ => unreachable!(),
180                            }
181                        } else {
182                            // For columns defined in the source schema but not present in the Parquet file, null values are filled in.
183                            let mut array_builder =
184                                ArrayBuilderImpl::with_type(column_size, rw_data_type.clone());
185                            array_builder.append_n_null(record_batch.num_rows());
186                            if let Some(metrics) = parquet_source_skip_row_count_metrics.clone() {
187                                metrics.inc_by(record_batch.num_rows() as u64);
188                            }
189                            Arc::new(array_builder.finish())
190                        };
191                        chunk_columns.push(column);
192                    }
193                }
194                crate::source::SourceColumnType::RowId => {
195                    let mut array_builder =
196                        ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
197                    let datum: Datum = None;
198                    array_builder.append_n(record_batch.num_rows(), datum);
199                    let res = array_builder.finish();
200                    let column = Arc::new(res);
201                    chunk_columns.push(column);
202                }
203                // The following fields are only used in CDC source
204                crate::source::SourceColumnType::Offset | crate::source::SourceColumnType::Meta => {
205                    unreachable!()
206                }
207            }
208        }
209        if let Some(metrics) = file_source_input_row_count_metrics {
210            metrics.inc_by(record_batch.num_rows() as u64);
211        }
212
213        let data_chunk = DataChunk::new(chunk_columns.clone(), record_batch.num_rows());
214        Ok(data_chunk.into())
215    }
216
217    fn find_parquet_column<'a>(
218        &self,
219        record_batch: &'a RecordBatch,
220        column_name: &str,
221    ) -> Option<&'a ArrayRef> {
222        if let Some(column) = record_batch.column_by_name(column_name) {
223            return Some(column);
224        }
225        if !self.case_insensitive {
226            return None;
227        }
228        let schema = record_batch.schema();
229        let mut matched_index: Option<usize> = None;
230        for (index, field) in schema.fields().iter().enumerate() {
231            if field.name().eq_ignore_ascii_case(column_name) {
232                if matched_index.is_some() {
233                    return None;
234                }
235                matched_index = Some(index);
236            }
237        }
238        matched_index.map(|index| record_batch.column(index))
239    }
240}