risingwave_connector/parser/
parquet_parser.rs1use std::sync::Arc;
15
16use futures_async_stream::try_stream;
17use prometheus::core::GenericCounter;
18use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch;
19use risingwave_common::array::arrow::{IcebergArrowConvert, is_parquet_schema_match_source_schema};
20use risingwave_common::array::{ArrayBuilderImpl, DataChunk, StreamChunk};
21use risingwave_common::metrics::LabelGuardedMetric;
22use risingwave_common::types::{Datum, ScalarImpl};
23use thiserror_ext::AsReport;
24
25use crate::parser::ConnectorResult;
26use crate::source::SourceColumnDesc;
27#[derive(Debug)]
30pub struct ParquetParser {
31 rw_columns: Vec<SourceColumnDesc>,
32 file_name: String,
33 offset: usize,
34}
35
36impl ParquetParser {
37 pub fn new(
38 rw_columns: Vec<SourceColumnDesc>,
39 file_name: String,
40 offset: usize,
41 ) -> ConnectorResult<Self> {
42 Ok(Self {
43 rw_columns,
44 file_name,
45 offset,
46 })
47 }
48
49 #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
50 pub async fn into_stream(
51 mut self,
52 record_batch_stream: parquet::arrow::async_reader::ParquetRecordBatchStream<
53 tokio_util::compat::Compat<opendal::FuturesAsyncReader>,
54 >,
55 file_source_input_row_count_metrics: Option<
56 LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
57 >,
58 parquet_source_skip_row_count_metrics: Option<
59 LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
60 >,
61 ) {
62 #[for_await]
63 for record_batch in record_batch_stream {
64 let record_batch: RecordBatch = record_batch?;
65 let chunk: StreamChunk = self.convert_record_batch_to_stream_chunk(
67 record_batch,
68 file_source_input_row_count_metrics.clone(),
69 parquet_source_skip_row_count_metrics.clone(),
70 )?;
71
72 yield chunk;
73 }
74 }
75
76 fn inc_offset(&mut self) {
77 self.offset += 1;
78 }
79
80 fn convert_record_batch_to_stream_chunk(
101 &mut self,
102 record_batch: RecordBatch,
103 file_source_input_row_count_metrics: Option<
104 LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
105 >,
106 parquet_source_skip_row_count_metrics: Option<
107 LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
108 >,
109 ) -> Result<StreamChunk, crate::error::ConnectorError> {
110 const MAX_HIDDEN_COLUMN_NUMS: usize = 3;
111 let column_size = self.rw_columns.len();
112 let mut chunk_columns = Vec::with_capacity(self.rw_columns.len() + MAX_HIDDEN_COLUMN_NUMS);
113
114 for source_column in self.rw_columns.clone() {
115 match source_column.column_type {
116 crate::source::SourceColumnType::Normal => {
117 let rw_data_type: &risingwave_common::types::DataType =
118 &source_column.data_type;
119 let rw_column_name = &source_column.name;
120 if let Some(parquet_column) = record_batch.column_by_name(rw_column_name)
121 && is_parquet_schema_match_source_schema(
122 parquet_column.data_type(),
123 rw_data_type,
124 )
125 {
126 let arrow_field = IcebergArrowConvert
127 .to_arrow_field(rw_column_name, rw_data_type)
128 .map_err(|e| {
129 crate::parser::AccessError::ParquetParser {
130 message: format!(
131 "to_arrow_field failed, column='{}', rw_type='{}', offset={}, error={}",
132 rw_column_name, rw_data_type, self.offset, e.as_report()
133 )
134 }
135 })?;
136 let array_impl = IcebergArrowConvert
137 .array_from_arrow_array(&arrow_field, parquet_column)
138 .map_err(|e| {
139 crate::parser::AccessError::ParquetParser {
140 message: format!(
141 "array_from_arrow_array failed, column='{}', rw_type='{}', arrow_field='{}', parquet_type='{}', offset={}, error={}",
142 rw_column_name,
143 rw_data_type,
144 arrow_field.data_type(),
145 parquet_column.data_type(),
146 self.offset,
147 e.as_report()
148 )
149 }
150 })?;
151 chunk_columns.push(Arc::new(array_impl));
152 } else {
153 let column = if let Some(additional_column_type) =
156 &source_column.additional_column.column_type
157 {
158 match additional_column_type {
159 risingwave_pb::plan_common::additional_column::ColumnType::Offset(_) => {
160 let mut array_builder = ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
161 for _ in 0..record_batch.num_rows() {
162 let datum: Datum = Some(ScalarImpl::Utf8((self.offset).to_string().into()));
163 self.inc_offset();
164 array_builder.append(datum);
165 }
166 Arc::new(array_builder.finish())
167 }
168 risingwave_pb::plan_common::additional_column::ColumnType::Filename(_) => {
169 let mut array_builder = ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
170 let datum: Datum = Some(ScalarImpl::Utf8(self.file_name.clone().into()));
171 array_builder.append_n(record_batch.num_rows(), datum);
172 Arc::new(array_builder.finish())
173 }
174 _ => unreachable!(),
175 }
176 } else {
177 let mut array_builder =
179 ArrayBuilderImpl::with_type(column_size, rw_data_type.clone());
180 array_builder.append_n_null(record_batch.num_rows());
181 if let Some(metrics) = parquet_source_skip_row_count_metrics.clone() {
182 metrics.inc_by(record_batch.num_rows() as u64);
183 }
184 Arc::new(array_builder.finish())
185 };
186 chunk_columns.push(column);
187 }
188 }
189 crate::source::SourceColumnType::RowId => {
190 let mut array_builder =
191 ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
192 let datum: Datum = None;
193 array_builder.append_n(record_batch.num_rows(), datum);
194 let res = array_builder.finish();
195 let column = Arc::new(res);
196 chunk_columns.push(column);
197 }
198 crate::source::SourceColumnType::Offset | crate::source::SourceColumnType::Meta => {
200 unreachable!()
201 }
202 }
203 }
204 if let Some(metrics) = file_source_input_row_count_metrics {
205 metrics.inc_by(record_batch.num_rows() as u64);
206 }
207
208 let data_chunk = DataChunk::new(chunk_columns.clone(), record_batch.num_rows());
209 Ok(data_chunk.into())
210 }
211}