risingwave_batch_executors/executor/
gcs_file_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures_async_stream::try_stream;
16use futures_util::stream::StreamExt;
17use risingwave_common::array::DataChunk;
18use risingwave_common::catalog::{Field, Schema};
19use risingwave_connector::source::iceberg::{
20    FileScanBackend, extract_bucket_and_file_name, new_gcs_operator, read_parquet_file,
21};
22use risingwave_pb::batch_plan::file_scan_node;
23use risingwave_pb::batch_plan::plan_node::NodeBody;
24
25use crate::error::BatchError;
26use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
27
28#[derive(PartialEq, Debug)]
29pub enum FileFormat {
30    Parquet,
31}
32
33/// Gcs file scan executor. Currently only support parquet file format.
34pub struct GcsFileScanExecutor {
35    file_format: FileFormat,
36    file_location: Vec<String>,
37    gcs_credential: String,
38    batch_size: usize,
39    schema: Schema,
40    identity: String,
41}
42
43impl Executor for GcsFileScanExecutor {
44    fn schema(&self) -> &risingwave_common::catalog::Schema {
45        &self.schema
46    }
47
48    fn identity(&self) -> &str {
49        &self.identity
50    }
51
52    fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
53        self.do_execute().boxed()
54    }
55}
56
57impl GcsFileScanExecutor {
58    pub fn new(
59        file_format: FileFormat,
60        file_location: Vec<String>,
61        gcs_credential: String,
62        batch_size: usize,
63        schema: Schema,
64        identity: String,
65    ) -> Self {
66        Self {
67            file_format,
68            file_location,
69            gcs_credential,
70            batch_size,
71            schema,
72            identity,
73        }
74    }
75
76    #[try_stream(ok = DataChunk, error = BatchError)]
77    async fn do_execute(self: Box<Self>) {
78        assert_eq!(self.file_format, FileFormat::Parquet);
79        for file in self.file_location {
80            let (bucket, file_name) = extract_bucket_and_file_name(&file, &FileScanBackend::Gcs)?;
81            let op = new_gcs_operator(self.gcs_credential.clone(), bucket.clone())?;
82            let chunk_stream =
83                read_parquet_file(op, file_name, None, None, self.batch_size, 0, None, None)
84                    .await?;
85            #[for_await]
86            for stream_chunk in chunk_stream {
87                let stream_chunk = stream_chunk?;
88                let (data_chunk, _) = stream_chunk.into_parts();
89                yield data_chunk;
90            }
91        }
92    }
93}
94
95pub struct GcsFileScanExecutorBuilder {}
96
97impl BoxedExecutorBuilder for GcsFileScanExecutorBuilder {
98    async fn new_boxed_executor(
99        source: &ExecutorBuilder<'_>,
100        _inputs: Vec<BoxedExecutor>,
101    ) -> crate::error::Result<BoxedExecutor> {
102        let file_scan_node = try_match_expand!(
103            source.plan_node().get_node_body().unwrap(),
104            NodeBody::GcsFileScan
105        )?;
106
107        Ok(Box::new(GcsFileScanExecutor::new(
108            match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() {
109                file_scan_node::FileFormat::Parquet => FileFormat::Parquet,
110                file_scan_node::FileFormat::Unspecified => unreachable!(),
111            },
112            file_scan_node.file_location.clone(),
113            file_scan_node.credential.clone(),
114            source.context().get_config().developer.chunk_size,
115            Schema::from_iter(file_scan_node.columns.iter().map(Field::from)),
116            source.plan_node().get_identity().clone(),
117        )))
118    }
119}