risingwave_connector/parser/
parquet_parser.rs1use std::sync::Arc;
15
16use futures_async_stream::try_stream;
17use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch;
18use risingwave_common::array::arrow::{IcebergArrowConvert, is_parquet_schema_match_source_schema};
19use risingwave_common::array::{ArrayBuilderImpl, DataChunk, StreamChunk};
20use risingwave_common::types::{Datum, ScalarImpl};
21
22use crate::parser::ConnectorResult;
23use crate::source::SourceColumnDesc;
24#[derive(Debug)]
27pub struct ParquetParser {
28 rw_columns: Vec<SourceColumnDesc>,
29 file_name: String,
30 offset: usize,
31}
32
33impl ParquetParser {
34 pub fn new(
35 rw_columns: Vec<SourceColumnDesc>,
36 file_name: String,
37 offset: usize,
38 ) -> ConnectorResult<Self> {
39 Ok(Self {
40 rw_columns,
41 file_name,
42 offset,
43 })
44 }
45
46 #[try_stream(boxed, ok = StreamChunk, error = crate::error::ConnectorError)]
47 pub async fn into_stream(
48 mut self,
49 record_batch_stream: parquet::arrow::async_reader::ParquetRecordBatchStream<
50 tokio_util::compat::Compat<opendal::FuturesAsyncReader>,
51 >,
52 file_source_input_row_count_metrics: Option<
53 risingwave_common::metrics::LabelGuardedMetric<
54 prometheus::core::GenericCounter<prometheus::core::AtomicU64>,
55 4,
56 >,
57 >,
58 parquet_source_skip_row_count_metrics: Option<
59 risingwave_common::metrics::LabelGuardedMetric<
60 prometheus::core::GenericCounter<prometheus::core::AtomicU64>,
61 4,
62 >,
63 >,
64 ) {
65 #[for_await]
66 for record_batch in record_batch_stream {
67 let record_batch: RecordBatch = record_batch?;
68 let chunk: StreamChunk = self.convert_record_batch_to_stream_chunk(
70 record_batch,
71 file_source_input_row_count_metrics.clone(),
72 parquet_source_skip_row_count_metrics.clone(),
73 )?;
74
75 yield chunk;
76 }
77 }
78
79 fn inc_offset(&mut self) {
80 self.offset += 1;
81 }
82
83 fn convert_record_batch_to_stream_chunk(
104 &mut self,
105 record_batch: RecordBatch,
106 file_source_input_row_count_metrics: Option<
107 risingwave_common::metrics::LabelGuardedMetric<
108 prometheus::core::GenericCounter<prometheus::core::AtomicU64>,
109 4,
110 >,
111 >,
112 parquet_source_skip_row_count_metrics: Option<
113 risingwave_common::metrics::LabelGuardedMetric<
114 prometheus::core::GenericCounter<prometheus::core::AtomicU64>,
115 4,
116 >,
117 >,
118 ) -> Result<StreamChunk, crate::error::ConnectorError> {
119 const MAX_HIDDEN_COLUMN_NUMS: usize = 3;
120 let column_size = self.rw_columns.len();
121 let mut chunk_columns = Vec::with_capacity(self.rw_columns.len() + MAX_HIDDEN_COLUMN_NUMS);
122
123 for source_column in self.rw_columns.clone() {
124 match source_column.column_type {
125 crate::source::SourceColumnType::Normal => {
126 let rw_data_type: &risingwave_common::types::DataType =
127 &source_column.data_type;
128 let rw_column_name = &source_column.name;
129
130 if let Some(parquet_column) = record_batch.column_by_name(rw_column_name)
131 && is_parquet_schema_match_source_schema(
132 parquet_column.data_type(),
133 rw_data_type,
134 )
135 {
136 let arrow_field =
137 IcebergArrowConvert.to_arrow_field(rw_column_name, rw_data_type)?;
138 let array_impl = IcebergArrowConvert
139 .array_from_arrow_array(&arrow_field, parquet_column)?;
140 chunk_columns.push(Arc::new(array_impl));
141 } else {
142 let column = if let Some(additional_column_type) =
145 &source_column.additional_column.column_type
146 {
147 match additional_column_type {
148 risingwave_pb::plan_common::additional_column::ColumnType::Offset(_) => {
149 let mut array_builder = ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
150 for _ in 0..record_batch.num_rows() {
151 let datum: Datum = Some(ScalarImpl::Utf8((self.offset).to_string().into()));
152 self.inc_offset();
153 array_builder.append(datum);
154 }
155 Arc::new(array_builder.finish())
156 }
157 risingwave_pb::plan_common::additional_column::ColumnType::Filename(_) => {
158 let mut array_builder = ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
159 let datum: Datum = Some(ScalarImpl::Utf8(self.file_name.clone().into()));
160 array_builder.append_n(record_batch.num_rows(), datum);
161 Arc::new(array_builder.finish())
162 }
163 _ => unreachable!(),
164 }
165 } else {
166 let mut array_builder =
168 ArrayBuilderImpl::with_type(column_size, rw_data_type.clone());
169 array_builder.append_n_null(record_batch.num_rows());
170 if let Some(metrics) = parquet_source_skip_row_count_metrics.clone() {
171 metrics.inc_by(record_batch.num_rows() as u64);
172 }
173 Arc::new(array_builder.finish())
174 };
175 chunk_columns.push(column);
176 }
177 }
178 crate::source::SourceColumnType::RowId => {
179 let mut array_builder =
180 ArrayBuilderImpl::with_type(column_size, source_column.data_type.clone());
181 let datum: Datum = None;
182 array_builder.append_n(record_batch.num_rows(), datum);
183 let res = array_builder.finish();
184 let column = Arc::new(res);
185 chunk_columns.push(column);
186 }
187 crate::source::SourceColumnType::Offset | crate::source::SourceColumnType::Meta => {
189 unreachable!()
190 }
191 }
192 }
193 if let Some(metrics) = file_source_input_row_count_metrics {
194 metrics.inc_by(record_batch.num_rows() as u64);
195 }
196
197 let data_chunk = DataChunk::new(chunk_columns.clone(), record_batch.num_rows());
198 Ok(data_chunk.into())
199 }
200}