risingwave_batch_executors/executor/
s3_file_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures_async_stream::try_stream;
16use futures_util::stream::StreamExt;
17use risingwave_common::array::DataChunk;
18use risingwave_common::catalog::{Field, Schema};
19use risingwave_connector::source::iceberg::{
20    FileScanBackend, extract_bucket_and_file_name, new_s3_operator, read_parquet_file,
21};
22use risingwave_pb::batch_plan::file_scan_node;
23use risingwave_pb::batch_plan::plan_node::NodeBody;
24
25use crate::error::BatchError;
26use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
27
28#[derive(PartialEq, Debug)]
29pub enum FileFormat {
30    Parquet,
31}
32
33/// S3 file scan executor. Currently only support parquet file format.
34pub struct S3FileScanExecutor {
35    file_format: FileFormat,
36    file_location: Vec<String>,
37    s3_region: String,
38    s3_access_key: String,
39    s3_secret_key: String,
40    s3_endpoint: String,
41    batch_size: usize,
42    schema: Schema,
43    identity: String,
44}
45
46impl Executor for S3FileScanExecutor {
47    fn schema(&self) -> &risingwave_common::catalog::Schema {
48        &self.schema
49    }
50
51    fn identity(&self) -> &str {
52        &self.identity
53    }
54
55    fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
56        self.do_execute().boxed()
57    }
58}
59
60impl S3FileScanExecutor {
61    pub fn new(
62        file_format: FileFormat,
63        file_location: Vec<String>,
64        s3_region: String,
65        s3_access_key: String,
66        s3_secret_key: String,
67        batch_size: usize,
68        schema: Schema,
69        identity: String,
70        s3_endpoint: String,
71    ) -> Self {
72        Self {
73            file_format,
74            file_location,
75            s3_region,
76            s3_access_key,
77            s3_secret_key,
78            s3_endpoint,
79            batch_size,
80            schema,
81            identity,
82        }
83    }
84
85    #[try_stream(ok = DataChunk, error = BatchError)]
86    async fn do_execute(self: Box<Self>) {
87        assert_eq!(self.file_format, FileFormat::Parquet);
88        for file in self.file_location {
89            let (bucket, file_name) = extract_bucket_and_file_name(&file, &FileScanBackend::S3)?;
90            let op = new_s3_operator(
91                self.s3_region.clone(),
92                self.s3_access_key.clone(),
93                self.s3_secret_key.clone(),
94                bucket.clone(),
95                self.s3_endpoint.clone(),
96            )?;
97            let chunk_stream =
98                read_parquet_file(op, file_name, None, None, self.batch_size, 0, None, None)
99                    .await?;
100            #[for_await]
101            for stream_chunk in chunk_stream {
102                let stream_chunk = stream_chunk?;
103                let (data_chunk, _) = stream_chunk.into_parts();
104                yield data_chunk;
105            }
106        }
107    }
108}
109
110pub struct FileScanExecutorBuilder {}
111
112impl BoxedExecutorBuilder for FileScanExecutorBuilder {
113    async fn new_boxed_executor(
114        source: &ExecutorBuilder<'_>,
115        _inputs: Vec<BoxedExecutor>,
116    ) -> crate::error::Result<BoxedExecutor> {
117        let file_scan_node = try_match_expand!(
118            source.plan_node().get_node_body().unwrap(),
119            NodeBody::FileScan
120        )?;
121
122        Ok(Box::new(S3FileScanExecutor::new(
123            match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() {
124                file_scan_node::FileFormat::Parquet => FileFormat::Parquet,
125                file_scan_node::FileFormat::Unspecified => unreachable!(),
126            },
127            file_scan_node.file_location.clone(),
128            file_scan_node.s3_region.clone(),
129            file_scan_node.s3_access_key.clone(),
130            file_scan_node.s3_secret_key.clone(),
131            source.context().get_config().developer.chunk_size,
132            Schema::from_iter(file_scan_node.columns.iter().map(Field::from)),
133            source.plan_node().get_identity().clone(),
134            file_scan_node.s3_endpoint.clone(),
135        )))
136    }
137}