risingwave_frontend/optimizer/plan_node/generic/
file_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use educe::Educe;
16use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema};
17
18use super::GenericPlanNode;
19use crate::optimizer::optimizer_context::OptimizerContextRef;
20use crate::optimizer::property::FunctionalDependencySet;
21
22#[derive(Debug, Clone, PartialEq, Eq, Hash)]
23pub enum FileFormat {
24    Parquet,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Hash)]
28pub enum StorageType {
29    S3,
30    Gcs,
31    Azblob,
32}
33
34#[allow(clippy::enum_variant_names)]
35#[derive(Debug, Clone, Educe)]
36#[educe(PartialEq, Eq, Hash)]
37pub enum FileScanBackend {
38    FileScan(FileScan),
39    GcsFileScan(GcsFileScan),
40    AzblobFileScan(AzblobFileScan),
41}
42
43#[derive(Debug, Clone, Educe)]
44#[educe(PartialEq, Eq, Hash)]
45pub struct GcsFileScan {
46    pub schema: Schema,
47    pub file_format: FileFormat,
48    pub storage_type: StorageType,
49    pub credential: String,
50    pub file_location: Vec<String>,
51
52    #[educe(PartialEq(ignore))]
53    #[educe(Hash(ignore))]
54    pub ctx: OptimizerContextRef,
55}
56
57impl GenericPlanNode for GcsFileScan {
58    fn schema(&self) -> Schema {
59        self.schema.clone()
60    }
61
62    fn stream_key(&self) -> Option<Vec<usize>> {
63        None
64    }
65
66    fn ctx(&self) -> OptimizerContextRef {
67        self.ctx.clone()
68    }
69
70    fn functional_dependency(&self) -> FunctionalDependencySet {
71        FunctionalDependencySet::new(self.schema.len())
72    }
73}
74
75#[derive(Debug, Clone, Educe)]
76#[educe(PartialEq, Eq, Hash)]
77pub struct AzblobFileScan {
78    pub schema: Schema,
79    pub file_format: FileFormat,
80    pub storage_type: StorageType,
81    pub account_name: String,
82    pub account_key: String,
83    pub endpoint: String,
84    pub file_location: Vec<String>,
85
86    #[educe(PartialEq(ignore))]
87    #[educe(Hash(ignore))]
88    pub ctx: OptimizerContextRef,
89}
90
91impl GenericPlanNode for AzblobFileScan {
92    fn schema(&self) -> Schema {
93        self.schema.clone()
94    }
95
96    fn stream_key(&self) -> Option<Vec<usize>> {
97        None
98    }
99
100    fn ctx(&self) -> OptimizerContextRef {
101        self.ctx.clone()
102    }
103
104    fn functional_dependency(&self) -> FunctionalDependencySet {
105        FunctionalDependencySet::new(self.schema.len())
106    }
107}
108
109impl FileScan {
110    pub fn columns(&self) -> Vec<ColumnDesc> {
111        self.schema
112            .fields
113            .iter()
114            .enumerate()
115            .map(|(i, f)| {
116                ColumnDesc::named(f.name.clone(), ColumnId::new(i as i32), f.data_type.clone())
117            })
118            .collect()
119    }
120}
121#[derive(Debug, Clone, Educe)]
122#[educe(PartialEq, Eq, Hash)]
123pub struct FileScan {
124    pub schema: Schema,
125    pub file_format: FileFormat,
126    pub storage_type: StorageType,
127    pub s3_region: String,
128    pub s3_access_key: String,
129    pub s3_secret_key: String,
130    pub file_location: Vec<String>,
131    pub s3_endpoint: String,
132
133    #[educe(PartialEq(ignore))]
134    #[educe(Hash(ignore))]
135    pub ctx: OptimizerContextRef,
136}
137
138impl GenericPlanNode for FileScan {
139    fn schema(&self) -> Schema {
140        self.schema.clone()
141    }
142
143    fn stream_key(&self) -> Option<Vec<usize>> {
144        None
145    }
146
147    fn ctx(&self) -> OptimizerContextRef {
148        self.ctx.clone()
149    }
150
151    fn functional_dependency(&self) -> FunctionalDependencySet {
152        FunctionalDependencySet::new(self.schema.len())
153    }
154}
155
156impl GcsFileScan {
157    pub fn columns(&self) -> Vec<ColumnDesc> {
158        self.schema
159            .fields
160            .iter()
161            .enumerate()
162            .map(|(i, f)| {
163                ColumnDesc::named(f.name.clone(), ColumnId::new(i as i32), f.data_type.clone())
164            })
165            .collect()
166    }
167}
168
169impl AzblobFileScan {
170    pub fn columns(&self) -> Vec<ColumnDesc> {
171        self.schema
172            .fields
173            .iter()
174            .enumerate()
175            .map(|(i, f)| {
176                ColumnDesc::named(f.name.clone(), ColumnId::new(i as i32), f.data_type.clone())
177            })
178            .collect()
179    }
180}
181
182impl GenericPlanNode for FileScanBackend {
183    fn schema(&self) -> Schema {
184        match self {
185            FileScanBackend::FileScan(file_scan) => file_scan.schema(),
186            FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.schema(),
187
188            FileScanBackend::AzblobFileScan(azblob_file_scan) => azblob_file_scan.schema(),
189        }
190    }
191
192    fn stream_key(&self) -> Option<Vec<usize>> {
193        match self {
194            FileScanBackend::FileScan(file_scan) => file_scan.stream_key(),
195            FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.stream_key(),
196
197            FileScanBackend::AzblobFileScan(azblob_file_scan) => azblob_file_scan.stream_key(),
198        }
199    }
200
201    fn ctx(&self) -> OptimizerContextRef {
202        match self {
203            FileScanBackend::FileScan(file_scan) => file_scan.ctx(),
204            FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.ctx(),
205
206            FileScanBackend::AzblobFileScan(azblob_file_scan) => azblob_file_scan.ctx(),
207        }
208    }
209
210    fn functional_dependency(&self) -> FunctionalDependencySet {
211        match self {
212            FileScanBackend::FileScan(file_scan) => file_scan.functional_dependency(),
213            FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.functional_dependency(),
214
215            FileScanBackend::AzblobFileScan(azblob_file_scan) => {
216                azblob_file_scan.functional_dependency()
217            }
218        }
219    }
220}
221
222impl FileScanBackend {
223    pub fn file_location(&self) -> Vec<String> {
224        match self {
225            FileScanBackend::FileScan(file_scan) => file_scan.file_location.clone(),
226            FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.file_location.clone(),
227
228            FileScanBackend::AzblobFileScan(azblob_file_scan) => {
229                azblob_file_scan.file_location.clone()
230            }
231        }
232    }
233}