risingwave_connector/source/iceberg/
parquet_file_handler.rs1use std::collections::HashMap;
16use std::future::IntoFuture;
17use std::ops::Range;
18use std::pin::Pin;
19use std::sync::Arc;
20
21use anyhow::anyhow;
22use bytes::Bytes;
23use futures::future::BoxFuture;
24use futures::{FutureExt, Stream, TryFutureExt};
25use iceberg::io::{
26 FileIOBuilder, FileMetadata, FileRead, S3_ACCESS_KEY_ID, S3_REGION, S3_SECRET_ACCESS_KEY,
27};
28use iceberg::{Error, ErrorKind};
29use itertools::Itertools;
30use opendal::Operator;
31use opendal::layers::{LoggingLayer, RetryLayer};
32use opendal::services::{Azblob, Gcs, S3};
33use parquet::arrow::async_reader::AsyncFileReader;
34use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, parquet_to_arrow_schema};
35use parquet::file::metadata::{FileMetaData, ParquetMetaData, ParquetMetaDataReader};
36use risingwave_common::array::StreamChunk;
37use risingwave_common::array::arrow::{IcebergArrowConvert, is_parquet_schema_match_source_schema};
38use risingwave_common::catalog::{ColumnDesc, ColumnId};
39use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt;
40use url::Url;
41
42use crate::error::ConnectorResult;
43use crate::parser::ParquetParser;
44use crate::source::{Column, SourceColumnDesc};
45
46pub struct ParquetFileReader<R: FileRead> {
47 meta: FileMetadata,
48 r: R,
49}
50
51impl<R: FileRead> ParquetFileReader<R> {
52 pub fn new(meta: FileMetadata, r: R) -> Self {
53 Self { meta, r }
54 }
55}
56
57impl<R: FileRead> AsyncFileReader for ParquetFileReader<R> {
58 fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
59 Box::pin(
60 self.r
61 .read(range.start as _..range.end as _)
62 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
63 )
64 }
65
66 fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
67 async move {
68 let reader = ParquetMetaDataReader::new();
69 let size = self.meta.size as usize;
70 let meta = reader.load_and_finish(self, size).await?;
71
72 Ok(Arc::new(meta))
73 }
74 .boxed()
75 }
76}
77
78pub async fn create_parquet_stream_builder(
79 s3_region: String,
80 s3_access_key: String,
81 s3_secret_key: String,
82 location: String,
83) -> Result<ParquetRecordBatchStreamBuilder<ParquetFileReader<impl FileRead>>, anyhow::Error> {
84 let mut props = HashMap::new();
85 props.insert(S3_REGION, s3_region.clone());
86 props.insert(S3_ACCESS_KEY_ID, s3_access_key.clone());
87 props.insert(S3_SECRET_ACCESS_KEY, s3_secret_key.clone());
88
89 let file_io_builder = FileIOBuilder::new("s3");
90 let file_io = file_io_builder
91 .with_props(props.into_iter())
92 .build()
93 .map_err(|e| anyhow!(e))?;
94 let parquet_file = file_io.new_input(&location).map_err(|e| anyhow!(e))?;
95
96 let parquet_metadata = parquet_file.metadata().await.map_err(|e| anyhow!(e))?;
97 let parquet_reader = parquet_file.reader().await.map_err(|e| anyhow!(e))?;
98 let parquet_file_reader = ParquetFileReader::new(parquet_metadata, parquet_reader);
99
100 ParquetRecordBatchStreamBuilder::new(parquet_file_reader)
101 .await
102 .map_err(|e| anyhow!(e))
103}
104
105pub fn new_s3_operator(
106 s3_region: String,
107 s3_access_key: String,
108 s3_secret_key: String,
109 bucket: String,
110 s3_endpoint: String,
111) -> ConnectorResult<Operator> {
112 let mut builder = S3::default();
113 builder = builder
114 .region(&s3_region)
115 .endpoint(&s3_endpoint)
116 .access_key_id(&s3_access_key)
117 .secret_access_key(&s3_secret_key)
118 .bucket(&bucket)
119 .disable_config_load();
120 let op: Operator = Operator::new(builder)?
121 .layer(LoggingLayer::default())
122 .layer(RetryLayer::default())
123 .finish();
124
125 Ok(op)
126}
127
128pub fn new_gcs_operator(credential: String, bucket: String) -> ConnectorResult<Operator> {
129 let builder = Gcs::default().bucket(&bucket).credential(&credential);
131
132 let operator: Operator = Operator::new(builder)?
133 .layer(LoggingLayer::default())
134 .layer(RetryLayer::default())
135 .finish();
136 Ok(operator)
137}
138
139pub fn new_azblob_operator(
140 endpoint: String,
141 account_name: String,
142 account_key: String,
143 container_name: String,
144) -> ConnectorResult<Operator> {
145 let mut builder = Azblob::default();
147 builder = builder
148 .container(&container_name)
149 .endpoint(&endpoint)
150 .account_name(&account_name)
151 .account_key(&account_key);
152
153 let operator: Operator = Operator::new(builder)?
154 .layer(LoggingLayer::default())
155 .layer(RetryLayer::default())
156 .finish();
157 Ok(operator)
158}
159
160#[derive(Debug, Clone)]
161pub enum FileScanBackend {
162 S3,
163 Gcs,
164 Azblob,
165}
166
167pub fn extract_bucket_and_file_name(
168 location: &str,
169 file_scan_backend: &FileScanBackend,
170) -> ConnectorResult<(String, String)> {
171 let url = Url::parse(location)?;
172 let bucket = url
173 .host_str()
174 .ok_or_else(|| {
175 Error::new(
176 ErrorKind::DataInvalid,
177 format!("Invalid url: {}, missing bucket", location),
178 )
179 })?
180 .to_owned();
181 let prefix = match file_scan_backend {
182 FileScanBackend::S3 => format!("s3://{}/", bucket),
183 FileScanBackend::Gcs => format!("gcs://{}/", bucket),
184 FileScanBackend::Azblob => format!("azblob://{}/", bucket),
185 };
186 let file_name = location[prefix.len()..].to_string();
187 Ok((bucket, file_name))
188}
189
190pub async fn list_data_directory(
191 op: Operator,
192 dir: String,
193 file_scan_backend: &FileScanBackend,
194) -> Result<Vec<String>, anyhow::Error> {
195 let (bucket, file_name) = extract_bucket_and_file_name(&dir, file_scan_backend)?;
196 let prefix = match file_scan_backend {
197 FileScanBackend::S3 => format!("s3://{}/", bucket),
198 FileScanBackend::Gcs => format!("gcs://{}/", bucket),
199 FileScanBackend::Azblob => format!("azblob://{}/", bucket),
200 };
201 if dir.starts_with(&prefix) {
202 op.list(&file_name)
203 .await
204 .map_err(|e| anyhow!(e))
205 .map(|list| {
206 list.into_iter()
207 .map(|entry| prefix.clone() + entry.path())
208 .collect()
209 })
210 } else {
211 Err(Error::new(
212 ErrorKind::DataInvalid,
213 format!("Invalid url: {}, should start with {}", dir, prefix),
214 ))?
215 }
216}
217
218pub fn get_project_mask(
235 columns: Option<Vec<Column>>,
236 metadata: &FileMetaData,
237) -> ConnectorResult<ProjectionMask> {
238 match columns {
239 Some(rw_columns) => {
240 let root_column_names = metadata
241 .schema_descr()
242 .root_schema()
243 .get_fields()
244 .iter()
245 .map(|field| field.name())
246 .collect_vec();
247
248 let converted_arrow_schema =
249 parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata())
250 .map_err(anyhow::Error::from)?;
251 let valid_column_indices: Vec<usize> = rw_columns
252 .iter()
253 .filter_map(|column| {
254 root_column_names
255 .iter()
256 .position(|&name| name == column.name)
257 .and_then(|pos| {
258 let arrow_data_type: &risingwave_common::array::arrow::arrow_schema_iceberg::DataType = converted_arrow_schema.field_with_name(&column.name).ok()?.data_type();
259 let rw_data_type: &risingwave_common::types::DataType = &column.data_type;
260 if is_parquet_schema_match_source_schema(arrow_data_type, rw_data_type) {
261 Some(pos)
262 } else {
263 None
264 }
265 })
266 })
267 .collect();
268
269 Ok(ProjectionMask::roots(
270 metadata.schema_descr(),
271 valid_column_indices,
272 ))
273 }
274 None => Ok(ProjectionMask::all()),
275 }
276}
277
278pub async fn read_parquet_file(
280 op: Operator,
281 file_name: String,
282 rw_columns: Option<Vec<Column>>,
283 parser_columns: Option<Vec<SourceColumnDesc>>,
284 batch_size: usize,
285 offset: usize,
286 file_source_input_row_count_metrics: Option<
287 risingwave_common::metrics::LabelGuardedMetric<
288 prometheus::core::GenericCounter<prometheus::core::AtomicU64>,
289 4,
290 >,
291 >,
292 parquet_source_skip_row_count_metrics: Option<
293 risingwave_common::metrics::LabelGuardedMetric<
294 prometheus::core::GenericCounter<prometheus::core::AtomicU64>,
295 4,
296 >,
297 >,
298) -> ConnectorResult<
299 Pin<Box<dyn Stream<Item = Result<StreamChunk, crate::error::ConnectorError>> + Send>>,
300> {
301 let mut reader: tokio_util::compat::Compat<opendal::FuturesAsyncReader> = op
302 .reader_with(&file_name)
303 .into_future() .await?
305 .into_futures_async_read(..)
306 .await?
307 .compat();
308 let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?;
309
310 let file_metadata = parquet_metadata.file_metadata();
311 let projection_mask = get_project_mask(rw_columns, file_metadata)?;
312
313 let record_batch_stream = ParquetRecordBatchStreamBuilder::new(reader)
316 .await?
317 .with_batch_size(batch_size)
318 .with_projection(projection_mask)
319 .with_offset(offset)
320 .build()?;
321 let converted_arrow_schema = parquet_to_arrow_schema(
322 file_metadata.schema_descr(),
323 file_metadata.key_value_metadata(),
324 )
325 .map_err(anyhow::Error::from)?;
326 let columns = match parser_columns {
327 Some(columns) => columns,
328 None => converted_arrow_schema
329 .fields
330 .iter()
331 .enumerate()
332 .map(|(index, field_ref)| {
333 let data_type = IcebergArrowConvert.type_from_field(field_ref).unwrap();
334 let column_desc = ColumnDesc::named(
335 field_ref.name().clone(),
336 ColumnId::new(index as i32),
337 data_type,
338 );
339 SourceColumnDesc::from(&column_desc)
340 })
341 .collect(),
342 };
343 let parquet_parser = ParquetParser::new(columns, file_name, offset)?;
344 let msg_stream: Pin<
345 Box<dyn Stream<Item = Result<StreamChunk, crate::error::ConnectorError>> + Send>,
346 > = parquet_parser.into_stream(
347 record_batch_stream,
348 file_source_input_row_count_metrics,
349 parquet_source_skip_row_count_metrics,
350 );
351 Ok(msg_stream)
352}
353
354pub async fn get_parquet_fields(
355 op: Operator,
356 file_name: String,
357) -> ConnectorResult<risingwave_common::array::arrow::arrow_schema_iceberg::Fields> {
358 let mut reader: tokio_util::compat::Compat<opendal::FuturesAsyncReader> = op
359 .reader_with(&file_name)
360 .into_future() .await?
362 .into_futures_async_read(..)
363 .await?
364 .compat();
365 let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?;
366
367 let file_metadata = parquet_metadata.file_metadata();
368 let converted_arrow_schema = parquet_to_arrow_schema(
369 file_metadata.schema_descr(),
370 file_metadata.key_value_metadata(),
371 )
372 .map_err(anyhow::Error::from)?;
373 let fields: risingwave_common::array::arrow::arrow_schema_iceberg::Fields =
374 converted_arrow_schema.fields;
375 Ok(fields)
376}