risingwave_batch_executors/executor/
azblob_file_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures_async_stream::try_stream;
16use futures_util::stream::StreamExt;
17use risingwave_common::array::DataChunk;
18use risingwave_common::catalog::{Field, Schema};
19use risingwave_connector::source::iceberg::{
20    FileScanBackend, extract_bucket_and_file_name, new_azblob_operator, read_parquet_file,
21};
22use risingwave_pb::batch_plan::file_scan_node;
23use risingwave_pb::batch_plan::plan_node::NodeBody;
24
25use crate::error::BatchError;
26use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
27
28#[derive(PartialEq, Debug)]
29pub enum FileFormat {
30    Parquet,
31}
32
33/// Azblob file scan executor. Currently only support parquet file format.
34pub struct AzblobFileScanExecutor {
35    file_format: FileFormat,
36    file_location: Vec<String>,
37    account_name: String,
38    account_key: String,
39    endpoint: String,
40    batch_size: usize,
41    schema: Schema,
42    identity: String,
43}
44
45impl Executor for AzblobFileScanExecutor {
46    fn schema(&self) -> &risingwave_common::catalog::Schema {
47        &self.schema
48    }
49
50    fn identity(&self) -> &str {
51        &self.identity
52    }
53
54    fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
55        self.do_execute().boxed()
56    }
57}
58
59impl AzblobFileScanExecutor {
60    pub fn new(
61        file_format: FileFormat,
62        file_location: Vec<String>,
63        account_name: String,
64        account_key: String,
65        endpoint: String,
66        batch_size: usize,
67        schema: Schema,
68        identity: String,
69    ) -> Self {
70        Self {
71            file_format,
72            file_location,
73            account_name,
74            account_key,
75            endpoint,
76            batch_size,
77            schema,
78            identity,
79        }
80    }
81
82    #[try_stream(ok = DataChunk, error = BatchError)]
83    async fn do_execute(self: Box<Self>) {
84        assert_eq!(self.file_format, FileFormat::Parquet);
85        for file in self.file_location {
86            let (bucket, file_name) =
87                extract_bucket_and_file_name(&file, &FileScanBackend::Azblob)?;
88            let op = new_azblob_operator(
89                self.endpoint.clone(),
90                self.account_name.clone(),
91                self.account_key.clone(),
92                bucket.clone(),
93            )?;
94            let chunk_stream = read_parquet_file(
95                op,
96                file_name,
97                None,
98                None,
99                false,
100                self.batch_size,
101                0,
102                None,
103                None,
104            )
105            .await?;
106            #[for_await]
107            for stream_chunk in chunk_stream {
108                let stream_chunk = stream_chunk?;
109                let (data_chunk, _) = stream_chunk.into_parts();
110                yield data_chunk;
111            }
112        }
113    }
114}
115
116pub struct AzblobFileScanExecutorBuilder {}
117
118impl BoxedExecutorBuilder for AzblobFileScanExecutorBuilder {
119    async fn new_boxed_executor(
120        source: &ExecutorBuilder<'_>,
121        _inputs: Vec<BoxedExecutor>,
122    ) -> crate::error::Result<BoxedExecutor> {
123        let file_scan_node = try_match_expand!(
124            source.plan_node().get_node_body().unwrap(),
125            NodeBody::AzblobFileScan
126        )?;
127
128        Ok(Box::new(AzblobFileScanExecutor::new(
129            match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() {
130                file_scan_node::FileFormat::Parquet => FileFormat::Parquet,
131                file_scan_node::FileFormat::Unspecified => unreachable!(),
132            },
133            file_scan_node.file_location.clone(),
134            file_scan_node.account_name.clone(),
135            file_scan_node.account_key.clone(),
136            file_scan_node.endpoint.clone(),
137            source.context().get_config().developer.chunk_size,
138            Schema::from_iter(file_scan_node.columns.iter().map(Field::from)),
139            source.plan_node().get_identity().clone(),
140        )))
141    }
142}