risingwave_batch_executors/executor/
azblob_file_scan.rs1use futures_async_stream::try_stream;
16use futures_util::stream::StreamExt;
17use risingwave_common::array::DataChunk;
18use risingwave_common::catalog::{Field, Schema};
19use risingwave_connector::source::iceberg::{
20 FileScanBackend, extract_bucket_and_file_name, new_azblob_operator, read_parquet_file,
21};
22use risingwave_pb::batch_plan::file_scan_node;
23use risingwave_pb::batch_plan::plan_node::NodeBody;
24
25use crate::error::BatchError;
26use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
27
28#[derive(PartialEq, Debug)]
29pub enum FileFormat {
30 Parquet,
31}
32
33pub struct AzblobFileScanExecutor {
35 file_format: FileFormat,
36 file_location: Vec<String>,
37 account_name: String,
38 account_key: String,
39 endpoint: String,
40 batch_size: usize,
41 schema: Schema,
42 identity: String,
43}
44
45impl Executor for AzblobFileScanExecutor {
46 fn schema(&self) -> &risingwave_common::catalog::Schema {
47 &self.schema
48 }
49
50 fn identity(&self) -> &str {
51 &self.identity
52 }
53
54 fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
55 self.do_execute().boxed()
56 }
57}
58
59impl AzblobFileScanExecutor {
60 pub fn new(
61 file_format: FileFormat,
62 file_location: Vec<String>,
63 account_name: String,
64 account_key: String,
65 endpoint: String,
66 batch_size: usize,
67 schema: Schema,
68 identity: String,
69 ) -> Self {
70 Self {
71 file_format,
72 file_location,
73 account_name,
74 account_key,
75 endpoint,
76 batch_size,
77 schema,
78 identity,
79 }
80 }
81
82 #[try_stream(ok = DataChunk, error = BatchError)]
83 async fn do_execute(self: Box<Self>) {
84 assert_eq!(self.file_format, FileFormat::Parquet);
85 for file in self.file_location {
86 let (bucket, file_name) =
87 extract_bucket_and_file_name(&file, &FileScanBackend::Azblob)?;
88 let op = new_azblob_operator(
89 self.endpoint.clone(),
90 self.account_name.clone(),
91 self.account_key.clone(),
92 bucket.clone(),
93 )?;
94 let chunk_stream = read_parquet_file(
95 op,
96 file_name,
97 None,
98 None,
99 false,
100 self.batch_size,
101 0,
102 None,
103 None,
104 )
105 .await?;
106 #[for_await]
107 for stream_chunk in chunk_stream {
108 let stream_chunk = stream_chunk?;
109 let (data_chunk, _) = stream_chunk.into_parts();
110 yield data_chunk;
111 }
112 }
113 }
114}
115
116pub struct AzblobFileScanExecutorBuilder {}
117
118impl BoxedExecutorBuilder for AzblobFileScanExecutorBuilder {
119 async fn new_boxed_executor(
120 source: &ExecutorBuilder<'_>,
121 _inputs: Vec<BoxedExecutor>,
122 ) -> crate::error::Result<BoxedExecutor> {
123 let file_scan_node = try_match_expand!(
124 source.plan_node().get_node_body().unwrap(),
125 NodeBody::AzblobFileScan
126 )?;
127
128 Ok(Box::new(AzblobFileScanExecutor::new(
129 match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() {
130 file_scan_node::FileFormat::Parquet => FileFormat::Parquet,
131 file_scan_node::FileFormat::Unspecified => unreachable!(),
132 },
133 file_scan_node.file_location.clone(),
134 file_scan_node.account_name.clone(),
135 file_scan_node.account_key.clone(),
136 file_scan_node.endpoint.clone(),
137 source.context().get_config().developer.chunk_size,
138 Schema::from_iter(file_scan_node.columns.iter().map(Field::from)),
139 source.plan_node().get_identity().clone(),
140 )))
141 }
142}