risingwave_connector/source/iceberg/
parquet_file_handler.rs1use std::collections::HashMap;
16use std::future::IntoFuture;
17use std::ops::Range;
18use std::pin::Pin;
19use std::sync::Arc;
20
21use anyhow::anyhow;
22use bytes::Bytes;
23use futures::future::BoxFuture;
24use futures::{FutureExt, Stream, TryFutureExt};
25use iceberg::io::{
26 FileIOBuilder, FileMetadata, FileRead, S3_ACCESS_KEY_ID, S3_REGION, S3_SECRET_ACCESS_KEY,
27};
28use iceberg::{Error, ErrorKind};
29use itertools::Itertools;
30use opendal::Operator;
31use opendal::layers::{LoggingLayer, RetryLayer};
32use opendal::services::{Azblob, Gcs, S3};
33use parquet::arrow::async_reader::AsyncFileReader;
34use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, parquet_to_arrow_schema};
35use parquet::file::metadata::{FileMetaData, ParquetMetaData, ParquetMetaDataReader};
36use prometheus::core::GenericCounter;
37use risingwave_common::array::StreamChunk;
38use risingwave_common::array::arrow::{IcebergArrowConvert, is_parquet_schema_match_source_schema};
39use risingwave_common::catalog::{ColumnDesc, ColumnId};
40use risingwave_common::metrics::LabelGuardedMetric;
41use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt;
42use url::Url;
43
44use crate::error::ConnectorResult;
45use crate::parser::ParquetParser;
46use crate::source::{Column, SourceColumnDesc};
47
48pub struct ParquetFileReader<R: FileRead> {
49 meta: FileMetadata,
50 r: R,
51}
52
53impl<R: FileRead> ParquetFileReader<R> {
54 pub fn new(meta: FileMetadata, r: R) -> Self {
55 Self { meta, r }
56 }
57}
58
59impl<R: FileRead> AsyncFileReader for ParquetFileReader<R> {
60 fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
61 Box::pin(
62 self.r
63 .read(range.start as _..range.end as _)
64 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
65 )
66 }
67
68 fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
69 async move {
70 let reader = ParquetMetaDataReader::new();
71 let size = self.meta.size as usize;
72 let meta = reader.load_and_finish(self, size).await?;
73
74 Ok(Arc::new(meta))
75 }
76 .boxed()
77 }
78}
79
80pub async fn create_parquet_stream_builder(
81 s3_region: String,
82 s3_access_key: String,
83 s3_secret_key: String,
84 location: String,
85) -> Result<ParquetRecordBatchStreamBuilder<ParquetFileReader<impl FileRead>>, anyhow::Error> {
86 let mut props = HashMap::new();
87 props.insert(S3_REGION, s3_region.clone());
88 props.insert(S3_ACCESS_KEY_ID, s3_access_key.clone());
89 props.insert(S3_SECRET_ACCESS_KEY, s3_secret_key.clone());
90
91 let file_io_builder = FileIOBuilder::new("s3");
92 let file_io = file_io_builder
93 .with_props(props.into_iter())
94 .build()
95 .map_err(|e| anyhow!(e))?;
96 let parquet_file = file_io.new_input(&location).map_err(|e| anyhow!(e))?;
97
98 let parquet_metadata = parquet_file.metadata().await.map_err(|e| anyhow!(e))?;
99 let parquet_reader = parquet_file.reader().await.map_err(|e| anyhow!(e))?;
100 let parquet_file_reader = ParquetFileReader::new(parquet_metadata, parquet_reader);
101
102 ParquetRecordBatchStreamBuilder::new(parquet_file_reader)
103 .await
104 .map_err(|e| anyhow!(e))
105}
106
107pub fn new_s3_operator(
108 s3_region: String,
109 s3_access_key: String,
110 s3_secret_key: String,
111 bucket: String,
112 s3_endpoint: String,
113) -> ConnectorResult<Operator> {
114 let mut builder = S3::default();
115 builder = builder
116 .region(&s3_region)
117 .endpoint(&s3_endpoint)
118 .access_key_id(&s3_access_key)
119 .secret_access_key(&s3_secret_key)
120 .bucket(&bucket)
121 .disable_config_load();
122 let op: Operator = Operator::new(builder)?
123 .layer(LoggingLayer::default())
124 .layer(RetryLayer::default())
125 .finish();
126
127 Ok(op)
128}
129
130pub fn new_gcs_operator(credential: String, bucket: String) -> ConnectorResult<Operator> {
131 let builder = Gcs::default().bucket(&bucket).credential(&credential);
133
134 let operator: Operator = Operator::new(builder)?
135 .layer(LoggingLayer::default())
136 .layer(RetryLayer::default())
137 .finish();
138 Ok(operator)
139}
140
141pub fn new_azblob_operator(
142 endpoint: String,
143 account_name: String,
144 account_key: String,
145 container_name: String,
146) -> ConnectorResult<Operator> {
147 let mut builder = Azblob::default();
149 builder = builder
150 .container(&container_name)
151 .endpoint(&endpoint)
152 .account_name(&account_name)
153 .account_key(&account_key);
154
155 let operator: Operator = Operator::new(builder)?
156 .layer(LoggingLayer::default())
157 .layer(RetryLayer::default())
158 .finish();
159 Ok(operator)
160}
161
162#[derive(Debug, Clone)]
163pub enum FileScanBackend {
164 S3,
165 Gcs,
166 Azblob,
167}
168
169pub fn extract_bucket_and_file_name(
170 location: &str,
171 file_scan_backend: &FileScanBackend,
172) -> ConnectorResult<(String, String)> {
173 let url = Url::parse(location)?;
174 let bucket = url
175 .host_str()
176 .ok_or_else(|| {
177 Error::new(
178 ErrorKind::DataInvalid,
179 format!("Invalid url: {}, missing bucket", location),
180 )
181 })?
182 .to_owned();
183 let prefix = match file_scan_backend {
184 FileScanBackend::S3 => format!("s3://{}/", bucket),
185 FileScanBackend::Gcs => format!("gcs://{}/", bucket),
186 FileScanBackend::Azblob => format!("azblob://{}/", bucket),
187 };
188 let file_name = location[prefix.len()..].to_string();
189 Ok((bucket, file_name))
190}
191
192pub async fn list_data_directory(
193 op: Operator,
194 dir: String,
195 file_scan_backend: &FileScanBackend,
196) -> Result<Vec<String>, anyhow::Error> {
197 let (bucket, file_name) = extract_bucket_and_file_name(&dir, file_scan_backend)?;
198 let prefix = match file_scan_backend {
199 FileScanBackend::S3 => format!("s3://{}/", bucket),
200 FileScanBackend::Gcs => format!("gcs://{}/", bucket),
201 FileScanBackend::Azblob => format!("azblob://{}/", bucket),
202 };
203 if dir.starts_with(&prefix) {
204 op.list(&file_name)
205 .await
206 .map_err(|e| anyhow!(e))
207 .map(|list| {
208 list.into_iter()
209 .map(|entry| prefix.clone() + entry.path())
210 .collect()
211 })
212 } else {
213 Err(Error::new(
214 ErrorKind::DataInvalid,
215 format!("Invalid url: {}, should start with {}", dir, prefix),
216 ))?
217 }
218}
219
220pub fn get_project_mask(
237 columns: Option<Vec<Column>>,
238 metadata: &FileMetaData,
239) -> ConnectorResult<ProjectionMask> {
240 match columns {
241 Some(rw_columns) => {
242 let root_column_names = metadata
243 .schema_descr()
244 .root_schema()
245 .get_fields()
246 .iter()
247 .map(|field| field.name())
248 .collect_vec();
249
250 let converted_arrow_schema =
251 parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata())
252 .map_err(anyhow::Error::from)?;
253 let valid_column_indices: Vec<usize> = rw_columns
254 .iter()
255 .filter_map(|column| {
256 root_column_names
257 .iter()
258 .position(|&name| name == column.name)
259 .and_then(|pos| {
260 let arrow_data_type: &risingwave_common::array::arrow::arrow_schema_iceberg::DataType = converted_arrow_schema.field_with_name(&column.name).ok()?.data_type();
261 let rw_data_type: &risingwave_common::types::DataType = &column.data_type;
262 if is_parquet_schema_match_source_schema(arrow_data_type, rw_data_type) {
263 Some(pos)
264 } else {
265 None
266 }
267 })
268 })
269 .collect();
270
271 Ok(ProjectionMask::roots(
272 metadata.schema_descr(),
273 valid_column_indices,
274 ))
275 }
276 None => Ok(ProjectionMask::all()),
277 }
278}
279
280pub async fn read_parquet_file(
282 op: Operator,
283 file_name: String,
284 rw_columns: Option<Vec<Column>>,
285 parser_columns: Option<Vec<SourceColumnDesc>>,
286 batch_size: usize,
287 offset: usize,
288 file_source_input_row_count_metrics: Option<
289 LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
290 >,
291 parquet_source_skip_row_count_metrics: Option<
292 LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
293 >,
294) -> ConnectorResult<
295 Pin<Box<dyn Stream<Item = Result<StreamChunk, crate::error::ConnectorError>> + Send>>,
296> {
297 let mut reader: tokio_util::compat::Compat<opendal::FuturesAsyncReader> = op
298 .reader_with(&file_name)
299 .into_future() .await?
301 .into_futures_async_read(..)
302 .await?
303 .compat();
304 let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?;
305
306 let file_metadata = parquet_metadata.file_metadata();
307 let projection_mask = get_project_mask(rw_columns, file_metadata)?;
308
309 let record_batch_stream = ParquetRecordBatchStreamBuilder::new(reader)
312 .await?
313 .with_batch_size(batch_size)
314 .with_projection(projection_mask)
315 .with_offset(offset)
316 .build()?;
317 let converted_arrow_schema = parquet_to_arrow_schema(
318 file_metadata.schema_descr(),
319 file_metadata.key_value_metadata(),
320 )
321 .map_err(anyhow::Error::from)?;
322 let columns = match parser_columns {
323 Some(columns) => columns,
324 None => converted_arrow_schema
325 .fields
326 .iter()
327 .enumerate()
328 .map(|(index, field_ref)| {
329 let data_type = IcebergArrowConvert.type_from_field(field_ref).unwrap();
330 let column_desc = ColumnDesc::named(
331 field_ref.name().clone(),
332 ColumnId::new(index as i32),
333 data_type,
334 );
335 SourceColumnDesc::from(&column_desc)
336 })
337 .collect(),
338 };
339 let parquet_parser = ParquetParser::new(columns, file_name, offset)?;
340 let msg_stream: Pin<
341 Box<dyn Stream<Item = Result<StreamChunk, crate::error::ConnectorError>> + Send>,
342 > = parquet_parser.into_stream(
343 record_batch_stream,
344 file_source_input_row_count_metrics,
345 parquet_source_skip_row_count_metrics,
346 );
347 Ok(msg_stream)
348}
349
350pub async fn get_parquet_fields(
351 op: Operator,
352 file_name: String,
353) -> ConnectorResult<risingwave_common::array::arrow::arrow_schema_iceberg::Fields> {
354 let mut reader: tokio_util::compat::Compat<opendal::FuturesAsyncReader> = op
355 .reader_with(&file_name)
356 .into_future() .await?
358 .into_futures_async_read(..)
359 .await?
360 .compat();
361 let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?;
362
363 let file_metadata = parquet_metadata.file_metadata();
364 let converted_arrow_schema = parquet_to_arrow_schema(
365 file_metadata.schema_descr(),
366 file_metadata.key_value_metadata(),
367 )
368 .map_err(anyhow::Error::from)?;
369 let fields: risingwave_common::array::arrow::arrow_schema_iceberg::Fields =
370 converted_arrow_schema.fields;
371 Ok(fields)
372}