risingwave_connector/source/iceberg/
parquet_file_handler.rs1use std::collections::HashMap;
16use std::future::IntoFuture;
17use std::ops::Range;
18use std::pin::Pin;
19use std::sync::Arc;
20
21use anyhow::{Context, bail};
22use bytes::Bytes;
23use futures::future::BoxFuture;
24use futures::{FutureExt, Stream, TryFutureExt};
25use iceberg::io::{
26 FileIOBuilder, FileMetadata, FileRead, S3_ACCESS_KEY_ID, S3_REGION, S3_SECRET_ACCESS_KEY,
27};
28use itertools::Itertools;
29use opendal::Operator;
30use opendal::layers::{LoggingLayer, RetryLayer};
31use opendal::services::{Azblob, Gcs, S3};
32use parquet::arrow::async_reader::AsyncFileReader;
33use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, parquet_to_arrow_schema};
34use parquet::file::metadata::{FileMetaData, ParquetMetaData, ParquetMetaDataReader};
35use prometheus::core::GenericCounter;
36use risingwave_common::array::StreamChunk;
37use risingwave_common::array::arrow::{IcebergArrowConvert, is_parquet_schema_match_source_schema};
38use risingwave_common::catalog::{ColumnDesc, ColumnId};
39use risingwave_common::metrics::LabelGuardedMetric;
40use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt;
41use url::Url;
42
43use crate::error::ConnectorResult;
44use crate::parser::ParquetParser;
45use crate::source::{Column, SourceColumnDesc};
46
47pub struct ParquetFileReader<R: FileRead> {
48 meta: FileMetadata,
49 r: R,
50}
51
52impl<R: FileRead> ParquetFileReader<R> {
53 pub fn new(meta: FileMetadata, r: R) -> Self {
54 Self { meta, r }
55 }
56}
57
58impl<R: FileRead> AsyncFileReader for ParquetFileReader<R> {
59 fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
60 Box::pin(
61 self.r
62 .read(range.start as _..range.end as _)
63 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
64 )
65 }
66
67 fn get_metadata(
68 &mut self,
69 _options: Option<&parquet::arrow::arrow_reader::ArrowReaderOptions>,
70 ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
71 async move {
72 let reader = ParquetMetaDataReader::new();
73 let size = self.meta.size;
74 let meta = reader.load_and_finish(self, size).await?;
75
76 Ok(Arc::new(meta))
77 }
78 .boxed()
79 }
80}
81
82pub async fn create_parquet_stream_builder(
83 s3_region: String,
84 s3_access_key: String,
85 s3_secret_key: String,
86 location: String,
87) -> Result<ParquetRecordBatchStreamBuilder<ParquetFileReader<impl FileRead>>, anyhow::Error> {
88 let mut props = HashMap::new();
89 props.insert(S3_REGION, s3_region.clone());
90 props.insert(S3_ACCESS_KEY_ID, s3_access_key.clone());
91 props.insert(S3_SECRET_ACCESS_KEY, s3_secret_key.clone());
92
93 let file_io_builder = FileIOBuilder::new("s3");
94 let file_io = file_io_builder.with_props(props.into_iter()).build()?;
95 let parquet_file = file_io.new_input(&location)?;
96
97 let parquet_metadata = parquet_file.metadata().await?;
98 let parquet_reader = parquet_file.reader().await?;
99 let parquet_file_reader = ParquetFileReader::new(parquet_metadata, parquet_reader);
100
101 ParquetRecordBatchStreamBuilder::new(parquet_file_reader)
102 .await
103 .map_err(Into::into)
104}
105
106pub fn new_s3_operator(
107 s3_region: String,
108 s3_access_key: String,
109 s3_secret_key: String,
110 bucket: String,
111 s3_endpoint: String,
112) -> ConnectorResult<Operator> {
113 let mut builder = S3::default();
114 builder = builder
115 .region(&s3_region)
116 .endpoint(&s3_endpoint)
117 .access_key_id(&s3_access_key)
118 .secret_access_key(&s3_secret_key)
119 .bucket(&bucket)
120 .disable_config_load();
121 let op: Operator = Operator::new(builder)?
122 .layer(LoggingLayer::default())
123 .layer(RetryLayer::default())
124 .finish();
125
126 Ok(op)
127}
128
129pub fn new_gcs_operator(credential: String, bucket: String) -> ConnectorResult<Operator> {
130 let builder = Gcs::default().bucket(&bucket).credential(&credential);
132
133 let operator: Operator = Operator::new(builder)?
134 .layer(LoggingLayer::default())
135 .layer(RetryLayer::default())
136 .finish();
137 Ok(operator)
138}
139
140pub fn new_azblob_operator(
141 endpoint: String,
142 account_name: String,
143 account_key: String,
144 container_name: String,
145) -> ConnectorResult<Operator> {
146 let mut builder = Azblob::default();
148 builder = builder
149 .container(&container_name)
150 .endpoint(&endpoint)
151 .account_name(&account_name)
152 .account_key(&account_key);
153
154 let operator: Operator = Operator::new(builder)?
155 .layer(LoggingLayer::default())
156 .layer(RetryLayer::default())
157 .finish();
158 Ok(operator)
159}
160
161#[derive(Debug, Clone)]
162pub enum FileScanBackend {
163 S3,
164 Gcs,
165 Azblob,
166}
167
168pub fn extract_bucket_and_file_name(
169 location: &str,
170 file_scan_backend: &FileScanBackend,
171) -> ConnectorResult<(String, String)> {
172 let url = Url::parse(location)?;
173 let bucket = url
174 .host_str()
175 .with_context(|| format!("Invalid url: {}, missing bucket", location))?
176 .to_owned();
177 let prefix = match file_scan_backend {
178 FileScanBackend::S3 => format!("s3://{}/", bucket),
179 FileScanBackend::Gcs => format!("gcs://{}/", bucket),
180 FileScanBackend::Azblob => format!("azblob://{}/", bucket),
181 };
182 let file_name = location[prefix.len()..].to_string();
183 Ok((bucket, file_name))
184}
185
186pub async fn list_data_directory(
187 op: Operator,
188 dir: String,
189 file_scan_backend: &FileScanBackend,
190) -> Result<Vec<String>, anyhow::Error> {
191 let (bucket, file_name) = extract_bucket_and_file_name(&dir, file_scan_backend)?;
192 let prefix = match file_scan_backend {
193 FileScanBackend::S3 => format!("s3://{}/", bucket),
194 FileScanBackend::Gcs => format!("gcs://{}/", bucket),
195 FileScanBackend::Azblob => format!("azblob://{}/", bucket),
196 };
197 if dir.starts_with(&prefix) {
198 op.list(&file_name).await.map_err(Into::into).map(|list| {
199 list.into_iter()
200 .map(|entry| prefix.clone() + entry.path())
201 .collect()
202 })
203 } else {
204 bail!("Invalid url: {}, should start with {}", dir, prefix)
205 }
206}
207
208pub fn get_project_mask(
225 columns: Option<Vec<Column>>,
226 metadata: &FileMetaData,
227 case_insensitive: bool,
228) -> ConnectorResult<ProjectionMask> {
229 match columns {
230 Some(rw_columns) => {
231 let root_column_names = metadata
232 .schema_descr()
233 .root_schema()
234 .get_fields()
235 .iter()
236 .map(|field| field.name())
237 .collect_vec();
238
239 let converted_arrow_schema =
240 parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata())
241 .map_err(anyhow::Error::from)?;
242 let mut lowercase_name_to_index: HashMap<String, Option<usize>> = HashMap::new();
243 if case_insensitive {
244 for (index, name) in root_column_names.iter().enumerate() {
245 let key = name.to_ascii_lowercase();
246 match lowercase_name_to_index.get(&key) {
247 Some(Some(_)) => {
248 lowercase_name_to_index.insert(key, None);
249 }
250 Some(None) => {}
251 None => {
252 lowercase_name_to_index.insert(key, Some(index));
253 }
254 }
255 }
256 }
257
258 let valid_column_indices: Vec<usize> = rw_columns
259 .iter()
260 .filter_map(|column| {
261 let exact_pos = root_column_names
262 .iter()
263 .position(|&name| name == column.name);
264 let pos = match (case_insensitive, exact_pos) {
265 (_, Some(pos)) => Some(pos),
266 (true, None) => lowercase_name_to_index
267 .get(&column.name.to_ascii_lowercase())
268 .copied()
269 .flatten(),
270 _ => None,
271 }?;
272 let arrow_field = converted_arrow_schema.fields.get(pos)?;
273 let arrow_data_type = arrow_field.data_type();
274 let rw_data_type: &risingwave_common::types::DataType = &column.data_type;
275 if is_parquet_schema_match_source_schema(arrow_data_type, rw_data_type) {
276 Some(pos)
277 } else {
278 None
279 }
280 })
281 .collect();
282
283 Ok(ProjectionMask::roots(
284 metadata.schema_descr(),
285 valid_column_indices,
286 ))
287 }
288 None => Ok(ProjectionMask::all()),
289 }
290}
291
292pub async fn read_parquet_file(
294 op: Operator,
295 file_name: String,
296 rw_columns: Option<Vec<Column>>,
297 parser_columns: Option<Vec<SourceColumnDesc>>,
298 case_insensitive: bool,
299 batch_size: usize,
300 offset: usize,
301 file_source_input_row_count_metrics: Option<
302 LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
303 >,
304 parquet_source_skip_row_count_metrics: Option<
305 LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
306 >,
307) -> ConnectorResult<
308 Pin<Box<dyn Stream<Item = Result<StreamChunk, crate::error::ConnectorError>> + Send>>,
309> {
310 let mut reader: tokio_util::compat::Compat<opendal::FuturesAsyncReader> = op
311 .reader_with(&file_name)
312 .into_future() .await?
314 .into_futures_async_read(..)
315 .await?
316 .compat();
317 let parquet_metadata = reader
318 .get_metadata(None)
319 .await
320 .map_err(anyhow::Error::from)?;
321
322 let file_metadata = parquet_metadata.file_metadata();
323 {
324 tracing::info!(
326 "Reading parquet file: {}, from offset {}, num_row_groups={}, total_rows={}, kv_len={}",
327 file_name,
328 offset,
329 parquet_metadata.row_groups().len(),
330 file_metadata.num_rows(),
331 file_metadata
332 .key_value_metadata()
333 .map(|m| m.len())
334 .unwrap_or(0)
335 );
336 let schema_descr = file_metadata.schema_descr();
338 for col in schema_descr.columns() {
339 let path = col.path().string();
340 let physical = col.physical_type();
341 let logical = col.logical_type();
342 tracing::debug!(
343 file = %file_name,
344 column_path = path,
345 physical_type = ?physical,
346 logical_type = ?logical,
347 type_length = ?col.type_length(),
348 max_def_level = col.max_def_level(),
349 max_rep_level = col.max_rep_level(),
350 "Parquet file column schema: "
351 );
352 }
353 }
354 let projection_mask = get_project_mask(rw_columns, file_metadata, case_insensitive)?;
355
356 let record_batch_stream = ParquetRecordBatchStreamBuilder::new(reader)
359 .await?
360 .with_batch_size(batch_size)
361 .with_projection(projection_mask)
362 .with_offset(offset)
363 .build()?;
364 let converted_arrow_schema = parquet_to_arrow_schema(
365 file_metadata.schema_descr(),
366 file_metadata.key_value_metadata(),
367 )?;
368 let columns = match parser_columns {
369 Some(columns) => columns,
370 None => converted_arrow_schema
371 .fields
372 .iter()
373 .enumerate()
374 .map(|(index, field_ref)| {
375 let data_type = IcebergArrowConvert.type_from_field(field_ref).unwrap();
376 let column_desc = ColumnDesc::named(
377 field_ref.name().clone(),
378 ColumnId::new(index as i32),
379 data_type,
380 );
381 SourceColumnDesc::from(&column_desc)
382 })
383 .collect(),
384 };
385 let parquet_parser = ParquetParser::new(columns, file_name, offset, case_insensitive)?;
386 let msg_stream: Pin<
387 Box<dyn Stream<Item = Result<StreamChunk, crate::error::ConnectorError>> + Send>,
388 > = parquet_parser.into_stream(
389 record_batch_stream,
390 file_source_input_row_count_metrics,
391 parquet_source_skip_row_count_metrics,
392 );
393 Ok(msg_stream)
394}
395
396pub async fn get_parquet_fields(
397 op: Operator,
398 file_name: String,
399) -> ConnectorResult<risingwave_common::array::arrow::arrow_schema_iceberg::Fields> {
400 let mut reader: tokio_util::compat::Compat<opendal::FuturesAsyncReader> = op
401 .reader_with(&file_name)
402 .into_future() .await?
404 .into_futures_async_read(..)
405 .await?
406 .compat();
407 let parquet_metadata = reader.get_metadata(None).await?;
408
409 let file_metadata = parquet_metadata.file_metadata();
410 let converted_arrow_schema = parquet_to_arrow_schema(
411 file_metadata.schema_descr(),
412 file_metadata.key_value_metadata(),
413 )?;
414 let fields: risingwave_common::array::arrow::arrow_schema_iceberg::Fields =
415 converted_arrow_schema.fields;
416 Ok(fields)
417}