risingwave_batch_executors/executor/
gcs_file_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures_async_stream::try_stream;
16use futures_util::stream::StreamExt;
17use risingwave_common::array::DataChunk;
18use risingwave_common::catalog::{Field, Schema};
19use risingwave_connector::source::iceberg::{
20    FileScanBackend, extract_bucket_and_file_name, new_gcs_operator, read_parquet_file,
21};
22use risingwave_pb::batch_plan::file_scan_node;
23use risingwave_pb::batch_plan::plan_node::NodeBody;
24
25use crate::error::BatchError;
26use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
27
28#[derive(PartialEq, Debug)]
29pub enum FileFormat {
30    Parquet,
31}
32
33/// Gcs file scan executor. Currently only support parquet file format.
34pub struct GcsFileScanExecutor {
35    file_format: FileFormat,
36    file_location: Vec<String>,
37    gcs_credential: String,
38    batch_size: usize,
39    schema: Schema,
40    identity: String,
41}
42
43impl Executor for GcsFileScanExecutor {
44    fn schema(&self) -> &risingwave_common::catalog::Schema {
45        &self.schema
46    }
47
48    fn identity(&self) -> &str {
49        &self.identity
50    }
51
52    fn execute(self: Box<Self>) -> super::BoxedDataChunkStream {
53        self.do_execute().boxed()
54    }
55}
56
57impl GcsFileScanExecutor {
58    pub fn new(
59        file_format: FileFormat,
60        file_location: Vec<String>,
61        gcs_credential: String,
62        batch_size: usize,
63        schema: Schema,
64        identity: String,
65    ) -> Self {
66        Self {
67            file_format,
68            file_location,
69            gcs_credential,
70            batch_size,
71            schema,
72            identity,
73        }
74    }
75
76    #[try_stream(ok = DataChunk, error = BatchError)]
77    async fn do_execute(self: Box<Self>) {
78        assert_eq!(self.file_format, FileFormat::Parquet);
79        for file in self.file_location {
80            let (bucket, file_name) = extract_bucket_and_file_name(&file, &FileScanBackend::Gcs)?;
81            let op = new_gcs_operator(self.gcs_credential.clone(), bucket.clone())?;
82            let chunk_stream = read_parquet_file(
83                op,
84                file_name,
85                None,
86                None,
87                false,
88                self.batch_size,
89                0,
90                None,
91                None,
92            )
93            .await?;
94            #[for_await]
95            for stream_chunk in chunk_stream {
96                let stream_chunk = stream_chunk?;
97                let (data_chunk, _) = stream_chunk.into_parts();
98                yield data_chunk;
99            }
100        }
101    }
102}
103
104pub struct GcsFileScanExecutorBuilder {}
105
106impl BoxedExecutorBuilder for GcsFileScanExecutorBuilder {
107    async fn new_boxed_executor(
108        source: &ExecutorBuilder<'_>,
109        _inputs: Vec<BoxedExecutor>,
110    ) -> crate::error::Result<BoxedExecutor> {
111        let file_scan_node = try_match_expand!(
112            source.plan_node().get_node_body().unwrap(),
113            NodeBody::GcsFileScan
114        )?;
115
116        Ok(Box::new(GcsFileScanExecutor::new(
117            match file_scan_node::FileFormat::try_from(file_scan_node.file_format).unwrap() {
118                file_scan_node::FileFormat::Parquet => FileFormat::Parquet,
119                file_scan_node::FileFormat::Unspecified => unreachable!(),
120            },
121            file_scan_node.file_location.clone(),
122            file_scan_node.credential.clone(),
123            source.context().get_config().developer.chunk_size,
124            Schema::from_iter(file_scan_node.columns.iter().map(Field::from)),
125            source.plan_node().get_identity().clone(),
126        )))
127    }
128}