risingwave_batch_executors/executor/
azblob_file_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures_async_stream::try_stream;
16use futures_util::stream::StreamExt;
17use risingwave_common::array::DataChunk;
18use risingwave_common::catalog::{Field, Schema};
19use risingwave_connector::source::iceberg::{
20    FileScanBackend, extract_bucket_and_file_name, new_azblob_operator, read_parquet_file,
21};
22use risingwave_pb::batch_plan::file_scan_node;
23use risingwave_pb::batch_plan::plan_node::NodeBody;
24
25use crate::error::BatchError;
26use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
27
28#[derive(PartialEq, Debug)]
29pub enum FileFormat {
30    Parquet,
31}
32
33/// Azblob file scan executor. Currently only support parquet file format.
34pub struct AzblobFileScanExecutor {
35    file_format: FileFormat,
36    file_location: Vec<String>,
37    account_name: String,
38    account_key: String,
39    endpoint: String,
40    batch_size: usize,
41    schema: Schema,
42    identity: String,
43}
44
45impl Executor for AzblobFileScanExecutor {
46    fn schema(&self) -> &risingwave_common::catalog::Schema {
47        &self.schema
48    }
49
50    fn identity(&self) -> &str {
51        &self.identity
52    }
53
54    fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
55        self.do_execute().boxed()
56    }
57}
58
59impl AzblobFileScanExecutor {
60    pub fn new(
61        file_format: FileFormat,
62        file_location: Vec<String>,
63        account_name: String,
64        account_key: String,
65        endpoint: String,
66        batch_size: usize,
67        schema: Schema,
68        identity: String,
69    ) -> Self {
70        Self {
71            file_format,
72            file_location,
73            account_name,
74            account_key,
75            endpoint,
76            batch_size,
77            schema,
78            identity,
79        }
80    }
81
82    #[try_stream(ok = DataChunk, error = BatchError)]
83    async fn do_execute(self: Box<Self>) {
84        assert_eq!(self.file_format, FileFormat::Parquet);
85        for file in self.file_location {
86            let (bucket, file_name) =
87                extract_bucket_and_file_name(&file, &FileScanBackend::Azblob)?;
88            let op = new_azblob_operator(
89                self.endpoint.clone(),
90                self.account_name.clone(),
91                self.account_key.clone(),
92                bucket.clone(),
93            )?;
94            let chunk_stream =
95                read_parquet_file(op, file_name, None, None, self.batch_size, 0, None, None)
96                    .await?;
97            #[for_await]
98            for stream_chunk in chunk_stream {
99                let stream_chunk = stream_chunk?;
100                let (data_chunk, _) = stream_chunk.into_parts();
101                yield data_chunk;
102            }
103        }
104    }
105}
106
107pub struct AzblobFileScanExecutorBuilder {}
108
109impl BoxedExecutorBuilder for AzblobFileScanExecutorBuilder {
110    async fn new_boxed_executor(
111        source: &ExecutorBuilder<'_>,
112        _inputs: Vec<BoxedExecutor>,
113    ) -> crate::error::Result<BoxedExecutor> {
114        let file_scan_node = try_match_expand!(
115            source.plan_node().get_node_body().unwrap(),
116            NodeBody::AzblobFileScan
117        )?;
118
119        Ok(Box::new(AzblobFileScanExecutor::new(
120            match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() {
121                file_scan_node::FileFormat::Parquet => FileFormat::Parquet,
122                file_scan_node::FileFormat::Unspecified => unreachable!(),
123            },
124            file_scan_node.file_location.clone(),
125            file_scan_node.account_name.clone(),
126            file_scan_node.account_key.clone(),
127            file_scan_node.endpoint.clone(),
128            source.context().get_config().developer.chunk_size,
129            Schema::from_iter(file_scan_node.columns.iter().map(Field::from)),
130            source.plan_node().get_identity().clone(),
131        )))
132    }
133}