risingwave_connector/parser/
parquet_parser.rs1use std::sync::Arc;
16
17use futures_async_stream::try_stream;
18use prometheus::core::GenericCounter;
19use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch;
20use risingwave_common::array::arrow::{IcebergArrowConvert, is_parquet_schema_match_source_schema};
21use risingwave_common::array::{ArrayBuilderImpl, DataChunk, StreamChunk};
22use risingwave_common::metrics::LabelGuardedMetric;
23use risingwave_common::types::{Datum, ScalarImpl};
24use thiserror_ext::AsReport;
25
26use crate::parser::ConnectorResult;
27use crate::source::SourceColumnDesc;
28#[derive(Debug)]
31pub struct ParquetParser {
32 rw_columns: Vec<SourceColumnDesc>,
33 file_name: String,
34 offset: usize,
35}
36
37impl ParquetParser {
38 pub fn new(
39 rw_columns: Vec<SourceColumnDesc>,
40 file_name: String,
41 offset: usize,
42 ) -> ConnectorResult<Self> {
43 Ok(Self {
44 rw_columns,
45 file_name,
46 offset,
47 })
48 }
49
50 #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
51 pub async fn into_stream(
52 mut self,
53 record_batch_stream: parquet::arrow::async_reader::ParquetRecordBatchStream<
54 tokio_util::compat::Compat<opendal::FuturesAsyncReader>,
55 >,
56 file_source_input_row_count_metrics: Option<
57 LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
58 >,
59 parquet_source_skip_row_count_metrics: Option<
60 LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
61 >,
62 ) {
63 #[for_await]
64 for record_batch in record_batch_stream {
65 let record_batch: RecordBatch = record_batch?;
66 let chunk: StreamChunk = self.convert_record_batch_to_stream_chunk(
68 record_batch,
69 file_source_input_row_count_metrics.clone(),
70 parquet_source_skip_row_count_metrics.clone(),
71 )?;
72
73 yield chunk;
74 }
75 }
76
77 fn inc_offset(&mut self) {
78 self.offset += 1;
79 }
80
81 fn convert_record_batch_to_stream_chunk(
102 &mut self,
103 record_batch: RecordBatch,
104 file_source_input_row_count_metrics: Option<
105 LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
106 >,
107 parquet_source_skip_row_count_metrics: Option<
108 LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
109 >,
110 ) -> Result<StreamChunk, crate::error::ConnectorError> {
111 const MAX_HIDDEN_COLUMN_NUMS: usize = 3;
112 let column_size = self.rw_columns.len();
113 let mut chunk_columns = Vec::with_capacity(self.rw_columns.len() + MAX_HIDDEN_COLUMN_NUMS);
114
115 for source_column in self.rw_columns.clone() {
116 match source_column.column_type {
117 crate::source::SourceColumnType::Normal => {
118 let rw_data_type: &risingwave_common::types::DataType =
119 &source_column.data_type;
120 let rw_column_name = &source_column.name;
121 if let Some(parquet_column) = record_batch.column_by_name(rw_column_name)
122 && is_parquet_schema_match_source_schema(
123 parquet_column.data_type(),
124 rw_data_type,
125 )
126 {
127 let arrow_field = IcebergArrowConvert
128 .to_arrow_field(rw_column_name, rw_data_type)
129 .map_err(|e| {
130 crate::parser::AccessError::ParquetParser {
131 message: format!(
132 "to_arrow_field failed, column='{}', rw_type='{}', offset={}, error={}",
133 rw_column_name, rw_data_type, self.offset, e.as_report()
134 )
135 }
136 })?;
137 let array_impl = IcebergArrowConvert
138 .array_from_arrow_array(&arrow_field, parquet_column)
139 .map_err(|e| {
140 crate::parser::AccessError::ParquetParser {
141 message: format!(
142 "array_from_arrow_array failed, column='{}', rw_type='{}', arrow_field='{}', parquet_type='{}', offset={}, error={}",
143 rw_column_name,
144 rw_data_type,
145 arrow_field.data_type(),
146 parquet_column.data_type(),
147 self.offset,
148 e.as_report()
149 )
150 }
151 })?;
152 chunk_columns.push(Arc::new(array_impl));
153 } else {
154 let column = if let Some(additional_column_type) =
157 &source_column.additional_column.column_type
158 {
159 match additional_column_type {
160 risingwave_pb::plan_common::additional_column::ColumnType::Offset(_) => {
161 let mut array_builder = ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
162 for _ in 0..record_batch.num_rows() {
163 let datum: Datum = Some(ScalarImpl::Utf8((self.offset).to_string().into()));
164 self.inc_offset();
165 array_builder.append(datum);
166 }
167 Arc::new(array_builder.finish())
168 }
169 risingwave_pb::plan_common::additional_column::ColumnType::Filename(_) => {
170 let mut array_builder = ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
171 let datum: Datum = Some(ScalarImpl::Utf8(self.file_name.clone().into()));
172 array_builder.append_n(record_batch.num_rows(), datum);
173 Arc::new(array_builder.finish())
174 }
175 _ => unreachable!(),
176 }
177 } else {
178 let mut array_builder =
180 ArrayBuilderImpl::with_type(column_size, rw_data_type.clone());
181 array_builder.append_n_null(record_batch.num_rows());
182 if let Some(metrics) = parquet_source_skip_row_count_metrics.clone() {
183 metrics.inc_by(record_batch.num_rows() as u64);
184 }
185 Arc::new(array_builder.finish())
186 };
187 chunk_columns.push(column);
188 }
189 }
190 crate::source::SourceColumnType::RowId => {
191 let mut array_builder =
192 ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
193 let datum: Datum = None;
194 array_builder.append_n(record_batch.num_rows(), datum);
195 let res = array_builder.finish();
196 let column = Arc::new(res);
197 chunk_columns.push(column);
198 }
199 crate::source::SourceColumnType::Offset | crate::source::SourceColumnType::Meta => {
201 unreachable!()
202 }
203 }
204 }
205 if let Some(metrics) = file_source_input_row_count_metrics {
206 metrics.inc_by(record_batch.num_rows() as u64);
207 }
208
209 let data_chunk = DataChunk::new(chunk_columns.clone(), record_batch.num_rows());
210 Ok(data_chunk.into())
211 }
212}