risingwave_connector/source/iceberg/
parquet_file_handler.rs1use std::collections::HashMap;
16use std::future::IntoFuture;
17use std::ops::Range;
18use std::pin::Pin;
19use std::sync::Arc;
20
21use anyhow::{Context, bail};
22use bytes::Bytes;
23use futures::future::BoxFuture;
24use futures::{FutureExt, Stream, TryFutureExt};
25use iceberg::io::{
26 FileIOBuilder, FileMetadata, FileRead, S3_ACCESS_KEY_ID, S3_REGION, S3_SECRET_ACCESS_KEY,
27};
28use itertools::Itertools;
29use opendal::Operator;
30use opendal::layers::{LoggingLayer, RetryLayer};
31use opendal::services::{Azblob, Gcs, S3};
32use parquet::arrow::async_reader::AsyncFileReader;
33use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, parquet_to_arrow_schema};
34use parquet::file::metadata::{FileMetaData, ParquetMetaData, ParquetMetaDataReader};
35use prometheus::core::GenericCounter;
36use risingwave_common::array::StreamChunk;
37use risingwave_common::array::arrow::{IcebergArrowConvert, is_parquet_schema_match_source_schema};
38use risingwave_common::catalog::{ColumnDesc, ColumnId};
39use risingwave_common::metrics::LabelGuardedMetric;
40use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt;
41use url::Url;
42
43use crate::error::ConnectorResult;
44use crate::parser::ParquetParser;
45use crate::source::{Column, SourceColumnDesc};
46
47pub struct ParquetFileReader<R: FileRead> {
48 meta: FileMetadata,
49 r: R,
50}
51
52impl<R: FileRead> ParquetFileReader<R> {
53 pub fn new(meta: FileMetadata, r: R) -> Self {
54 Self { meta, r }
55 }
56}
57
58impl<R: FileRead> AsyncFileReader for ParquetFileReader<R> {
59 fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
60 Box::pin(
61 self.r
62 .read(range.start as _..range.end as _)
63 .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))),
64 )
65 }
66
67 fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
68 async move {
69 let reader = ParquetMetaDataReader::new();
70 let size = self.meta.size as usize;
71 let meta = reader.load_and_finish(self, size).await?;
72
73 Ok(Arc::new(meta))
74 }
75 .boxed()
76 }
77}
78
79pub async fn create_parquet_stream_builder(
80 s3_region: String,
81 s3_access_key: String,
82 s3_secret_key: String,
83 location: String,
84) -> Result<ParquetRecordBatchStreamBuilder<ParquetFileReader<impl FileRead>>, anyhow::Error> {
85 let mut props = HashMap::new();
86 props.insert(S3_REGION, s3_region.clone());
87 props.insert(S3_ACCESS_KEY_ID, s3_access_key.clone());
88 props.insert(S3_SECRET_ACCESS_KEY, s3_secret_key.clone());
89
90 let file_io_builder = FileIOBuilder::new("s3");
91 let file_io = file_io_builder.with_props(props.into_iter()).build()?;
92 let parquet_file = file_io.new_input(&location)?;
93
94 let parquet_metadata = parquet_file.metadata().await?;
95 let parquet_reader = parquet_file.reader().await?;
96 let parquet_file_reader = ParquetFileReader::new(parquet_metadata, parquet_reader);
97
98 ParquetRecordBatchStreamBuilder::new(parquet_file_reader)
99 .await
100 .map_err(Into::into)
101}
102
103pub fn new_s3_operator(
104 s3_region: String,
105 s3_access_key: String,
106 s3_secret_key: String,
107 bucket: String,
108 s3_endpoint: String,
109) -> ConnectorResult<Operator> {
110 let mut builder = S3::default();
111 builder = builder
112 .region(&s3_region)
113 .endpoint(&s3_endpoint)
114 .access_key_id(&s3_access_key)
115 .secret_access_key(&s3_secret_key)
116 .bucket(&bucket)
117 .disable_config_load();
118 let op: Operator = Operator::new(builder)?
119 .layer(LoggingLayer::default())
120 .layer(RetryLayer::default())
121 .finish();
122
123 Ok(op)
124}
125
126pub fn new_gcs_operator(credential: String, bucket: String) -> ConnectorResult<Operator> {
127 let builder = Gcs::default().bucket(&bucket).credential(&credential);
129
130 let operator: Operator = Operator::new(builder)?
131 .layer(LoggingLayer::default())
132 .layer(RetryLayer::default())
133 .finish();
134 Ok(operator)
135}
136
137pub fn new_azblob_operator(
138 endpoint: String,
139 account_name: String,
140 account_key: String,
141 container_name: String,
142) -> ConnectorResult<Operator> {
143 let mut builder = Azblob::default();
145 builder = builder
146 .container(&container_name)
147 .endpoint(&endpoint)
148 .account_name(&account_name)
149 .account_key(&account_key);
150
151 let operator: Operator = Operator::new(builder)?
152 .layer(LoggingLayer::default())
153 .layer(RetryLayer::default())
154 .finish();
155 Ok(operator)
156}
157
158#[derive(Debug, Clone)]
159pub enum FileScanBackend {
160 S3,
161 Gcs,
162 Azblob,
163}
164
165pub fn extract_bucket_and_file_name(
166 location: &str,
167 file_scan_backend: &FileScanBackend,
168) -> ConnectorResult<(String, String)> {
169 let url = Url::parse(location)?;
170 let bucket = url
171 .host_str()
172 .with_context(|| format!("Invalid url: {}, missing bucket", location))?
173 .to_owned();
174 let prefix = match file_scan_backend {
175 FileScanBackend::S3 => format!("s3://{}/", bucket),
176 FileScanBackend::Gcs => format!("gcs://{}/", bucket),
177 FileScanBackend::Azblob => format!("azblob://{}/", bucket),
178 };
179 let file_name = location[prefix.len()..].to_string();
180 Ok((bucket, file_name))
181}
182
183pub async fn list_data_directory(
184 op: Operator,
185 dir: String,
186 file_scan_backend: &FileScanBackend,
187) -> Result<Vec<String>, anyhow::Error> {
188 let (bucket, file_name) = extract_bucket_and_file_name(&dir, file_scan_backend)?;
189 let prefix = match file_scan_backend {
190 FileScanBackend::S3 => format!("s3://{}/", bucket),
191 FileScanBackend::Gcs => format!("gcs://{}/", bucket),
192 FileScanBackend::Azblob => format!("azblob://{}/", bucket),
193 };
194 if dir.starts_with(&prefix) {
195 op.list(&file_name).await.map_err(Into::into).map(|list| {
196 list.into_iter()
197 .map(|entry| prefix.clone() + entry.path())
198 .collect()
199 })
200 } else {
201 bail!("Invalid url: {}, should start with {}", dir, prefix)
202 }
203}
204
205pub fn get_project_mask(
222 columns: Option<Vec<Column>>,
223 metadata: &FileMetaData,
224) -> ConnectorResult<ProjectionMask> {
225 match columns {
226 Some(rw_columns) => {
227 let root_column_names = metadata
228 .schema_descr()
229 .root_schema()
230 .get_fields()
231 .iter()
232 .map(|field| field.name())
233 .collect_vec();
234
235 let converted_arrow_schema =
236 parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata())
237 .map_err(anyhow::Error::from)?;
238 let valid_column_indices: Vec<usize> = rw_columns
239 .iter()
240 .filter_map(|column| {
241 root_column_names
242 .iter()
243 .position(|&name| name == column.name)
244 .and_then(|pos| {
245 let arrow_data_type: &risingwave_common::array::arrow::arrow_schema_iceberg::DataType = converted_arrow_schema.field_with_name(&column.name).ok()?.data_type();
246 let rw_data_type: &risingwave_common::types::DataType = &column.data_type;
247 if is_parquet_schema_match_source_schema(arrow_data_type, rw_data_type) {
248 Some(pos)
249 } else {
250 None
251 }
252 })
253 })
254 .collect();
255
256 Ok(ProjectionMask::roots(
257 metadata.schema_descr(),
258 valid_column_indices,
259 ))
260 }
261 None => Ok(ProjectionMask::all()),
262 }
263}
264
265pub async fn read_parquet_file(
267 op: Operator,
268 file_name: String,
269 rw_columns: Option<Vec<Column>>,
270 parser_columns: Option<Vec<SourceColumnDesc>>,
271 batch_size: usize,
272 offset: usize,
273 file_source_input_row_count_metrics: Option<
274 LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
275 >,
276 parquet_source_skip_row_count_metrics: Option<
277 LabelGuardedMetric<GenericCounter<prometheus::core::AtomicU64>>,
278 >,
279) -> ConnectorResult<
280 Pin<Box<dyn Stream<Item = Result<StreamChunk, crate::error::ConnectorError>> + Send>>,
281> {
282 let mut reader: tokio_util::compat::Compat<opendal::FuturesAsyncReader> = op
283 .reader_with(&file_name)
284 .into_future() .await?
286 .into_futures_async_read(..)
287 .await?
288 .compat();
289 let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?;
290
291 let file_metadata = parquet_metadata.file_metadata();
292 let projection_mask = get_project_mask(rw_columns, file_metadata)?;
293
294 let record_batch_stream = ParquetRecordBatchStreamBuilder::new(reader)
297 .await?
298 .with_batch_size(batch_size)
299 .with_projection(projection_mask)
300 .with_offset(offset)
301 .build()?;
302 let converted_arrow_schema = parquet_to_arrow_schema(
303 file_metadata.schema_descr(),
304 file_metadata.key_value_metadata(),
305 )?;
306 let columns = match parser_columns {
307 Some(columns) => columns,
308 None => converted_arrow_schema
309 .fields
310 .iter()
311 .enumerate()
312 .map(|(index, field_ref)| {
313 let data_type = IcebergArrowConvert.type_from_field(field_ref).unwrap();
314 let column_desc = ColumnDesc::named(
315 field_ref.name().clone(),
316 ColumnId::new(index as i32),
317 data_type,
318 );
319 SourceColumnDesc::from(&column_desc)
320 })
321 .collect(),
322 };
323 let parquet_parser = ParquetParser::new(columns, file_name, offset)?;
324 let msg_stream: Pin<
325 Box<dyn Stream<Item = Result<StreamChunk, crate::error::ConnectorError>> + Send>,
326 > = parquet_parser.into_stream(
327 record_batch_stream,
328 file_source_input_row_count_metrics,
329 parquet_source_skip_row_count_metrics,
330 );
331 Ok(msg_stream)
332}
333
334pub async fn get_parquet_fields(
335 op: Operator,
336 file_name: String,
337) -> ConnectorResult<risingwave_common::array::arrow::arrow_schema_iceberg::Fields> {
338 let mut reader: tokio_util::compat::Compat<opendal::FuturesAsyncReader> = op
339 .reader_with(&file_name)
340 .into_future() .await?
342 .into_futures_async_read(..)
343 .await?
344 .compat();
345 let parquet_metadata = reader.get_metadata().await?;
346
347 let file_metadata = parquet_metadata.file_metadata();
348 let converted_arrow_schema = parquet_to_arrow_schema(
349 file_metadata.schema_descr(),
350 file_metadata.key_value_metadata(),
351 )?;
352 let fields: risingwave_common::array::arrow::arrow_schema_iceberg::Fields =
353 converted_arrow_schema.fields;
354 Ok(fields)
355}