risingwave_connector/parser/
parquet_parser.rs1use std::sync::Arc;
15
16use futures_async_stream::try_stream;
17use prometheus::core::GenericCounter;
18use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch;
19use risingwave_common::array::arrow::{IcebergArrowConvert, is_parquet_schema_match_source_schema};
20use risingwave_common::array::{ArrayBuilderImpl, DataChunk, StreamChunk};
21use risingwave_common::metrics::LabelGuardedMetric;
22use risingwave_common::types::{Datum, ScalarImpl};
23
24use crate::parser::ConnectorResult;
25use crate::source::SourceColumnDesc;
26#[derive(Debug)]
29pub struct ParquetParser {
30 rw_columns: Vec<SourceColumnDesc>,
31 file_name: String,
32 offset: usize,
33}
34
35impl ParquetParser {
36 pub fn new(
37 rw_columns: Vec<SourceColumnDesc>,
38 file_name: String,
39 offset: usize,
40 ) -> ConnectorResult<Self> {
41 Ok(Self {
42 rw_columns,
43 file_name,
44 offset,
45 })
46 }
47
48 #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
49 pub async fn into_stream(
50 mut self,
51 record_batch_stream: parquet::arrow::async_reader::ParquetRecordBatchStream<
52 tokio_util::compat::Compat<opendal::FuturesAsyncReader>,
53 >,
54 file_source_input_row_count_metrics: Option<
55 LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
56 >,
57 parquet_source_skip_row_count_metrics: Option<
58 LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
59 >,
60 ) {
61 #[for_await]
62 for record_batch in record_batch_stream {
63 let record_batch: RecordBatch = record_batch?;
64 let chunk: StreamChunk = self.convert_record_batch_to_stream_chunk(
66 record_batch,
67 file_source_input_row_count_metrics.clone(),
68 parquet_source_skip_row_count_metrics.clone(),
69 )?;
70
71 yield chunk;
72 }
73 }
74
75 fn inc_offset(&mut self) {
76 self.offset += 1;
77 }
78
79 fn convert_record_batch_to_stream_chunk(
100 &mut self,
101 record_batch: RecordBatch,
102 file_source_input_row_count_metrics: Option<
103 LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
104 >,
105 parquet_source_skip_row_count_metrics: Option<
106 LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
107 >,
108 ) -> Result<StreamChunk, crate::error::ConnectorError> {
109 const MAX_HIDDEN_COLUMN_NUMS: usize = 3;
110 let column_size = self.rw_columns.len();
111 let mut chunk_columns = Vec::with_capacity(self.rw_columns.len() + MAX_HIDDEN_COLUMN_NUMS);
112
113 for source_column in self.rw_columns.clone() {
114 match source_column.column_type {
115 crate::source::SourceColumnType::Normal => {
116 let rw_data_type: &risingwave_common::types::DataType =
117 &source_column.data_type;
118 let rw_column_name = &source_column.name;
119
120 if let Some(parquet_column) = record_batch.column_by_name(rw_column_name)
121 && is_parquet_schema_match_source_schema(
122 parquet_column.data_type(),
123 rw_data_type,
124 )
125 {
126 let arrow_field =
127 IcebergArrowConvert.to_arrow_field(rw_column_name, rw_data_type)?;
128 let array_impl = IcebergArrowConvert
129 .array_from_arrow_array(&arrow_field, parquet_column)?;
130 chunk_columns.push(Arc::new(array_impl));
131 } else {
132 let column = if let Some(additional_column_type) =
135 &source_column.additional_column.column_type
136 {
137 match additional_column_type {
138 risingwave_pb::plan_common::additional_column::ColumnType::Offset(_) => {
139 let mut array_builder = ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
140 for _ in 0..record_batch.num_rows() {
141 let datum: Datum = Some(ScalarImpl::Utf8((self.offset).to_string().into()));
142 self.inc_offset();
143 array_builder.append(datum);
144 }
145 Arc::new(array_builder.finish())
146 }
147 risingwave_pb::plan_common::additional_column::ColumnType::Filename(_) => {
148 let mut array_builder = ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
149 let datum: Datum = Some(ScalarImpl::Utf8(self.file_name.clone().into()));
150 array_builder.append_n(record_batch.num_rows(), datum);
151 Arc::new(array_builder.finish())
152 }
153 _ => unreachable!(),
154 }
155 } else {
156 let mut array_builder =
158 ArrayBuilderImpl::with_type(column_size, rw_data_type.clone());
159 array_builder.append_n_null(record_batch.num_rows());
160 if let Some(metrics) = parquet_source_skip_row_count_metrics.clone() {
161 metrics.inc_by(record_batch.num_rows() as u64);
162 }
163 Arc::new(array_builder.finish())
164 };
165 chunk_columns.push(column);
166 }
167 }
168 crate::source::SourceColumnType::RowId => {
169 let mut array_builder =
170 ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
171 let datum: Datum = None;
172 array_builder.append_n(record_batch.num_rows(), datum);
173 let res = array_builder.finish();
174 let column = Arc::new(res);
175 chunk_columns.push(column);
176 }
177 crate::source::SourceColumnType::Offset | crate::source::SourceColumnType::Meta => {
179 unreachable!()
180 }
181 }
182 }
183 if let Some(metrics) = file_source_input_row_count_metrics {
184 metrics.inc_by(record_batch.num_rows() as u64);
185 }
186
187 let data_chunk = DataChunk::new(chunk_columns.clone(), record_batch.num_rows());
188 Ok(data_chunk.into())
189 }
190}