risingwave_frontend/optimizer/plan_node/generic/
file_scan.rs1use educe::Educe;
16use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema};
17
18use super::GenericPlanNode;
19use crate::optimizer::optimizer_context::OptimizerContextRef;
20use crate::optimizer::property::FunctionalDependencySet;
21
22#[derive(Debug, Clone, PartialEq, Eq, Hash)]
23pub enum FileFormat {
24 Parquet,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Hash)]
28pub enum StorageType {
29 S3,
30 Gcs,
31 Azblob,
32}
33
34#[allow(clippy::enum_variant_names)]
35#[derive(Debug, Clone, Educe)]
36#[educe(PartialEq, Eq, Hash)]
37pub enum FileScanBackend {
38 FileScan(FileScan),
39 GcsFileScan(GcsFileScan),
40 AzblobFileScan(AzblobFileScan),
41}
42
43#[derive(Debug, Clone, Educe)]
44#[educe(PartialEq, Eq, Hash)]
45pub struct GcsFileScan {
46 pub schema: Schema,
47 pub file_format: FileFormat,
48 pub storage_type: StorageType,
49 pub credential: String,
50 pub file_location: Vec<String>,
51
52 #[educe(PartialEq(ignore))]
53 #[educe(Hash(ignore))]
54 pub ctx: OptimizerContextRef,
55}
56
57impl GenericPlanNode for GcsFileScan {
58 fn schema(&self) -> Schema {
59 self.schema.clone()
60 }
61
62 fn stream_key(&self) -> Option<Vec<usize>> {
63 None
64 }
65
66 fn ctx(&self) -> OptimizerContextRef {
67 self.ctx.clone()
68 }
69
70 fn functional_dependency(&self) -> FunctionalDependencySet {
71 FunctionalDependencySet::new(self.schema.len())
72 }
73}
74
75#[derive(Debug, Clone, Educe)]
76#[educe(PartialEq, Eq, Hash)]
77pub struct AzblobFileScan {
78 pub schema: Schema,
79 pub file_format: FileFormat,
80 pub storage_type: StorageType,
81 pub account_name: String,
82 pub account_key: String,
83 pub endpoint: String,
84 pub file_location: Vec<String>,
85
86 #[educe(PartialEq(ignore))]
87 #[educe(Hash(ignore))]
88 pub ctx: OptimizerContextRef,
89}
90
91impl GenericPlanNode for AzblobFileScan {
92 fn schema(&self) -> Schema {
93 self.schema.clone()
94 }
95
96 fn stream_key(&self) -> Option<Vec<usize>> {
97 None
98 }
99
100 fn ctx(&self) -> OptimizerContextRef {
101 self.ctx.clone()
102 }
103
104 fn functional_dependency(&self) -> FunctionalDependencySet {
105 FunctionalDependencySet::new(self.schema.len())
106 }
107}
108
109impl FileScan {
110 pub fn columns(&self) -> Vec<ColumnDesc> {
111 self.schema
112 .fields
113 .iter()
114 .enumerate()
115 .map(|(i, f)| {
116 ColumnDesc::named(f.name.clone(), ColumnId::new(i as i32), f.data_type.clone())
117 })
118 .collect()
119 }
120}
121#[derive(Debug, Clone, Educe)]
122#[educe(PartialEq, Eq, Hash)]
123pub struct FileScan {
124 pub schema: Schema,
125 pub file_format: FileFormat,
126 pub storage_type: StorageType,
127 pub s3_region: String,
128 pub s3_access_key: String,
129 pub s3_secret_key: String,
130 pub file_location: Vec<String>,
131 pub s3_endpoint: String,
132
133 #[educe(PartialEq(ignore))]
134 #[educe(Hash(ignore))]
135 pub ctx: OptimizerContextRef,
136}
137
138impl GenericPlanNode for FileScan {
139 fn schema(&self) -> Schema {
140 self.schema.clone()
141 }
142
143 fn stream_key(&self) -> Option<Vec<usize>> {
144 None
145 }
146
147 fn ctx(&self) -> OptimizerContextRef {
148 self.ctx.clone()
149 }
150
151 fn functional_dependency(&self) -> FunctionalDependencySet {
152 FunctionalDependencySet::new(self.schema.len())
153 }
154}
155
156impl GcsFileScan {
157 pub fn columns(&self) -> Vec<ColumnDesc> {
158 self.schema
159 .fields
160 .iter()
161 .enumerate()
162 .map(|(i, f)| {
163 ColumnDesc::named(f.name.clone(), ColumnId::new(i as i32), f.data_type.clone())
164 })
165 .collect()
166 }
167}
168
169impl AzblobFileScan {
170 pub fn columns(&self) -> Vec<ColumnDesc> {
171 self.schema
172 .fields
173 .iter()
174 .enumerate()
175 .map(|(i, f)| {
176 ColumnDesc::named(f.name.clone(), ColumnId::new(i as i32), f.data_type.clone())
177 })
178 .collect()
179 }
180}
181
182impl GenericPlanNode for FileScanBackend {
183 fn schema(&self) -> Schema {
184 match self {
185 FileScanBackend::FileScan(file_scan) => file_scan.schema(),
186 FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.schema(),
187
188 FileScanBackend::AzblobFileScan(azblob_file_scan) => azblob_file_scan.schema(),
189 }
190 }
191
192 fn stream_key(&self) -> Option<Vec<usize>> {
193 match self {
194 FileScanBackend::FileScan(file_scan) => file_scan.stream_key(),
195 FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.stream_key(),
196
197 FileScanBackend::AzblobFileScan(azblob_file_scan) => azblob_file_scan.stream_key(),
198 }
199 }
200
201 fn ctx(&self) -> OptimizerContextRef {
202 match self {
203 FileScanBackend::FileScan(file_scan) => file_scan.ctx(),
204 FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.ctx(),
205
206 FileScanBackend::AzblobFileScan(azblob_file_scan) => azblob_file_scan.ctx(),
207 }
208 }
209
210 fn functional_dependency(&self) -> FunctionalDependencySet {
211 match self {
212 FileScanBackend::FileScan(file_scan) => file_scan.functional_dependency(),
213 FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.functional_dependency(),
214
215 FileScanBackend::AzblobFileScan(azblob_file_scan) => {
216 azblob_file_scan.functional_dependency()
217 }
218 }
219 }
220}
221
222impl FileScanBackend {
223 pub fn file_location(&self) -> Vec<String> {
224 match self {
225 FileScanBackend::FileScan(file_scan) => file_scan.file_location.clone(),
226 FileScanBackend::GcsFileScan(gcs_file_scan) => gcs_file_scan.file_location.clone(),
227
228 FileScanBackend::AzblobFileScan(azblob_file_scan) => {
229 azblob_file_scan.file_location.clone()
230 }
231 }
232 }
233}