risingwave_batch_executors/executor/
azblob_file_scan.rs1use futures_async_stream::try_stream;
16use futures_util::stream::StreamExt;
17use risingwave_common::array::DataChunk;
18use risingwave_common::catalog::{Field, Schema};
19use risingwave_connector::source::iceberg::{
20 FileScanBackend, extract_bucket_and_file_name, new_azblob_operator, read_parquet_file,
21};
22use risingwave_pb::batch_plan::file_scan_node;
23use risingwave_pb::batch_plan::plan_node::NodeBody;
24
25use crate::error::BatchError;
26use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
27
28#[derive(PartialEq, Debug)]
29pub enum FileFormat {
30 Parquet,
31}
32
33pub struct AzblobFileScanExecutor {
35 file_format: FileFormat,
36 file_location: Vec<String>,
37 account_name: String,
38 account_key: String,
39 endpoint: String,
40 batch_size: usize,
41 schema: Schema,
42 identity: String,
43}
44
45impl Executor for AzblobFileScanExecutor {
46 fn schema(&self) -> &risingwave_common::catalog::Schema {
47 &self.schema
48 }
49
50 fn identity(&self) -> &str {
51 &self.identity
52 }
53
54 fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
55 self.do_execute().boxed()
56 }
57}
58
59impl AzblobFileScanExecutor {
60 pub fn new(
61 file_format: FileFormat,
62 file_location: Vec<String>,
63 account_name: String,
64 account_key: String,
65 endpoint: String,
66 batch_size: usize,
67 schema: Schema,
68 identity: String,
69 ) -> Self {
70 Self {
71 file_format,
72 file_location,
73 account_name,
74 account_key,
75 endpoint,
76 batch_size,
77 schema,
78 identity,
79 }
80 }
81
82 #[try_stream(ok = DataChunk, error = BatchError)]
83 async fn do_execute(self: Box<Self>) {
84 assert_eq!(self.file_format, FileFormat::Parquet);
85 for file in self.file_location {
86 let (bucket, file_name) =
87 extract_bucket_and_file_name(&file, &FileScanBackend::Azblob)?;
88 let op = new_azblob_operator(
89 self.endpoint.clone(),
90 self.account_name.clone(),
91 self.account_key.clone(),
92 bucket.clone(),
93 )?;
94 let chunk_stream =
95 read_parquet_file(op, file_name, None, None, self.batch_size, 0, None, None)
96 .await?;
97 #[for_await]
98 for stream_chunk in chunk_stream {
99 let stream_chunk = stream_chunk?;
100 let (data_chunk, _) = stream_chunk.into_parts();
101 yield data_chunk;
102 }
103 }
104 }
105}
106
107pub struct AzblobFileScanExecutorBuilder {}
108
109impl BoxedExecutorBuilder for AzblobFileScanExecutorBuilder {
110 async fn new_boxed_executor(
111 source: &ExecutorBuilder<'_>,
112 _inputs: Vec<BoxedExecutor>,
113 ) -> crate::error::Result<BoxedExecutor> {
114 let file_scan_node = try_match_expand!(
115 source.plan_node().get_node_body().unwrap(),
116 NodeBody::AzblobFileScan
117 )?;
118
119 Ok(Box::new(AzblobFileScanExecutor::new(
120 match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() {
121 file_scan_node::FileFormat::Parquet => FileFormat::Parquet,
122 file_scan_node::FileFormat::Unspecified => unreachable!(),
123 },
124 file_scan_node.file_location.clone(),
125 file_scan_node.account_name.clone(),
126 file_scan_node.account_key.clone(),
127 file_scan_node.endpoint.clone(),
128 source.context().get_config().developer.chunk_size,
129 Schema::from_iter(file_scan_node.columns.iter().map(Field::from)),
130 source.plan_node().get_identity().clone(),
131 )))
132 }
133}