risingwave_connector/parser/
parquet_parser.rs1use std::sync::Arc;
16
17use futures_async_stream::try_stream;
18use prometheus::core::GenericCounter;
19use risingwave_common::array::arrow::arrow_array_iceberg::{ArrayRef, RecordBatch};
20use risingwave_common::array::arrow::{IcebergArrowConvert, is_parquet_schema_match_source_schema};
21use risingwave_common::array::{ArrayBuilderImpl, DataChunk, StreamChunk};
22use risingwave_common::metrics::LabelGuardedMetric;
23use risingwave_common::types::{Datum, ScalarImpl};
24use thiserror_ext::AsReport;
25
26use crate::parser::ConnectorResult;
27use crate::source::SourceColumnDesc;
28#[derive(Debug)]
31pub struct ParquetParser {
32 rw_columns: Vec<SourceColumnDesc>,
33 file_name: String,
34 offset: usize,
35 case_insensitive: bool,
36}
37
38impl ParquetParser {
39 pub fn new(
40 rw_columns: Vec<SourceColumnDesc>,
41 file_name: String,
42 offset: usize,
43 case_insensitive: bool,
44 ) -> ConnectorResult<Self> {
45 Ok(Self {
46 rw_columns,
47 file_name,
48 offset,
49 case_insensitive,
50 })
51 }
52
53 #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
54 pub async fn into_stream(
55 mut self,
56 record_batch_stream: parquet::arrow::async_reader::ParquetRecordBatchStream<
57 tokio_util::compat::Compat<opendal::FuturesAsyncReader>,
58 >,
59 file_source_input_row_count_metrics: Option<
60 LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
61 >,
62 parquet_source_skip_row_count_metrics: Option<
63 LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
64 >,
65 ) {
66 #[for_await]
67 for record_batch in record_batch_stream {
68 let record_batch: RecordBatch = record_batch?;
69 let chunk: StreamChunk = self.convert_record_batch_to_stream_chunk(
71 record_batch,
72 file_source_input_row_count_metrics.clone(),
73 parquet_source_skip_row_count_metrics.clone(),
74 )?;
75
76 yield chunk;
77 }
78 }
79
80 fn inc_offset(&mut self) {
81 self.offset += 1;
82 }
83
84 fn convert_record_batch_to_stream_chunk(
105 &mut self,
106 record_batch: RecordBatch,
107 file_source_input_row_count_metrics: Option<
108 LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
109 >,
110 parquet_source_skip_row_count_metrics: Option<
111 LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
112 >,
113 ) -> Result<StreamChunk, crate::error::ConnectorError> {
114 const MAX_HIDDEN_COLUMN_NUMS: usize = 3;
115 let column_size = self.rw_columns.len();
116 let mut chunk_columns = Vec::with_capacity(self.rw_columns.len() + MAX_HIDDEN_COLUMN_NUMS);
117
118 for source_column in self.rw_columns.clone() {
119 match source_column.column_type {
120 crate::source::SourceColumnType::Normal => {
121 let rw_data_type: &risingwave_common::types::DataType =
122 &source_column.data_type;
123 let rw_column_name = &source_column.name;
124 if let Some(parquet_column) =
125 self.find_parquet_column(&record_batch, rw_column_name)
126 && is_parquet_schema_match_source_schema(
127 parquet_column.data_type(),
128 rw_data_type,
129 )
130 {
131 let arrow_field = IcebergArrowConvert
132 .to_arrow_field(rw_column_name, rw_data_type)
133 .map_err(|e| {
134 crate::parser::AccessError::ParquetParser {
135 message: format!(
136 "to_arrow_field failed, column='{}', rw_type='{}', offset={}, error={}",
137 rw_column_name, rw_data_type, self.offset, e.as_report()
138 )
139 }
140 })?;
141 let array_impl = IcebergArrowConvert
142 .array_from_arrow_array(&arrow_field, parquet_column)
143 .map_err(|e| {
144 crate::parser::AccessError::ParquetParser {
145 message: format!(
146 "array_from_arrow_array failed, column='{}', rw_type='{}', arrow_field='{}', parquet_type='{}', offset={}, error={}",
147 rw_column_name,
148 rw_data_type,
149 arrow_field.data_type(),
150 parquet_column.data_type(),
151 self.offset,
152 e.as_report()
153 )
154 }
155 })?;
156 chunk_columns.push(Arc::new(array_impl));
157 } else {
158 let column = if let Some(additional_column_type) =
161 &source_column.additional_column.column_type
162 {
163 match additional_column_type {
164 risingwave_pb::plan_common::additional_column::ColumnType::Offset(_) => {
165 let mut array_builder = ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
166 for _ in 0..record_batch.num_rows() {
167 let datum: Datum = Some(ScalarImpl::Utf8((self.offset).to_string().into()));
168 self.inc_offset();
169 array_builder.append(datum);
170 }
171 Arc::new(array_builder.finish())
172 }
173 risingwave_pb::plan_common::additional_column::ColumnType::Filename(_) => {
174 let mut array_builder = ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
175 let datum: Datum = Some(ScalarImpl::Utf8(self.file_name.clone().into()));
176 array_builder.append_n(record_batch.num_rows(), datum);
177 Arc::new(array_builder.finish())
178 }
179 _ => unreachable!(),
180 }
181 } else {
182 let mut array_builder =
184 ArrayBuilderImpl::with_type(column_size, rw_data_type.clone());
185 array_builder.append_n_null(record_batch.num_rows());
186 if let Some(metrics) = parquet_source_skip_row_count_metrics.clone() {
187 metrics.inc_by(record_batch.num_rows() as u64);
188 }
189 Arc::new(array_builder.finish())
190 };
191 chunk_columns.push(column);
192 }
193 }
194 crate::source::SourceColumnType::RowId => {
195 let mut array_builder =
196 ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
197 let datum: Datum = None;
198 array_builder.append_n(record_batch.num_rows(), datum);
199 let res = array_builder.finish();
200 let column = Arc::new(res);
201 chunk_columns.push(column);
202 }
203 crate::source::SourceColumnType::Offset | crate::source::SourceColumnType::Meta => {
205 unreachable!()
206 }
207 }
208 }
209 if let Some(metrics) = file_source_input_row_count_metrics {
210 metrics.inc_by(record_batch.num_rows() as u64);
211 }
212
213 let data_chunk = DataChunk::new(chunk_columns.clone(), record_batch.num_rows());
214 Ok(data_chunk.into())
215 }
216
217 fn find_parquet_column<'a>(
218 &self,
219 record_batch: &'a RecordBatch,
220 column_name: &str,
221 ) -> Option<&'a ArrayRef> {
222 if let Some(column) = record_batch.column_by_name(column_name) {
223 return Some(column);
224 }
225 if !self.case_insensitive {
226 return None;
227 }
228 let schema = record_batch.schema();
229 let mut matched_index: Option<usize> = None;
230 for (index, field) in schema.fields().iter().enumerate() {
231 if field.name().eq_ignore_ascii_case(column_name) {
232 if matched_index.is_some() {
233 return None;
234 }
235 matched_index = Some(index);
236 }
237 }
238 matched_index.map(|index| record_batch.column(index))
239 }
240}