risingwave_frontend/optimizer/plan_node/generic/
source.rs1use std::rc::Rc;
16
17use educe::Educe;
18use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, Schema};
19use risingwave_common::types::DataType;
20use risingwave_common::util::sort_util::OrderType;
21use risingwave_connector::WithPropertiesExt;
22use risingwave_sqlparser::ast::AsOf;
23
24use super::super::utils::TableCatalogBuilder;
25use super::GenericPlanNode;
26use crate::TableCatalog;
27use crate::catalog::source_catalog::SourceCatalog;
28use crate::optimizer::optimizer_context::OptimizerContextRef;
29use crate::optimizer::property::{FunctionalDependencySet, StreamKind};
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33#[expect(clippy::enum_variant_names)]
34pub enum SourceNodeKind {
35 CreateTable,
37 CreateSharedSource,
39 CreateMViewOrBatch,
45}
46
47#[derive(Debug, Clone, Educe)]
49#[educe(PartialEq, Eq, Hash)]
50pub struct Source {
51 pub catalog: Option<Rc<SourceCatalog>>,
53
54 pub column_catalog: Vec<ColumnCatalog>,
58 pub row_id_index: Option<usize>,
59
60 pub kind: SourceNodeKind,
61
62 #[educe(PartialEq(ignore))]
63 #[educe(Hash(ignore))]
64 pub ctx: OptimizerContextRef,
65
66 pub as_of: Option<AsOf>,
67}
68
69impl GenericPlanNode for Source {
70 fn schema(&self) -> Schema {
71 let fields = self
72 .column_catalog
73 .iter()
74 .map(|c| (&c.column_desc).into())
75 .collect();
76 Schema { fields }
77 }
78
79 fn stream_key(&self) -> Option<Vec<usize>> {
80 if let Some(idx) = self.row_id_index {
83 Some(vec![idx])
84 } else if let Some(catalog) = &self.catalog {
85 catalog
86 .pk_col_ids
87 .iter()
88 .map(|id| {
89 self.column_catalog
90 .iter()
91 .position(|c| c.column_id() == *id)
92 })
93 .collect::<Option<Vec<_>>>()
94 } else {
95 None
96 }
97 }
98
99 fn ctx(&self) -> OptimizerContextRef {
100 self.ctx.clone()
101 }
102
103 fn functional_dependency(&self) -> FunctionalDependencySet {
104 let pk_indices = self.stream_key();
105 match pk_indices {
106 Some(pk_indices) => {
107 debug_assert!(
108 pk_indices
109 .iter()
110 .all(|idx| *idx < self.column_catalog.len())
111 );
112 FunctionalDependencySet::with_key(self.column_catalog.len(), &pk_indices)
113 }
114 None => FunctionalDependencySet::new(self.column_catalog.len()),
115 }
116 }
117}
118
119impl Source {
120 pub fn stream_kind(&self) -> StreamKind {
121 if let Some(catalog) = &self.catalog {
122 if catalog.append_only {
123 StreamKind::AppendOnly
124 } else {
125 StreamKind::Upsert
128 }
129 } else {
130 StreamKind::AppendOnly
132 }
133 }
134
135 pub fn file_list_node(core: Self) -> Self {
137 let column_catalog = if core.is_iceberg_connector() {
138 vec![
139 ColumnCatalog {
140 column_desc: ColumnDesc::from_field_with_column_id(
141 &Field {
142 name: "file_path".to_owned(),
143 data_type: DataType::Varchar,
144 },
145 0,
146 ),
147 is_hidden: false,
148 },
149 ColumnCatalog {
150 column_desc: ColumnDesc::from_field_with_column_id(
151 &Field {
152 name: "file_scan_task".to_owned(),
153 data_type: DataType::Jsonb,
154 },
155 1,
156 ),
157 is_hidden: false,
158 },
159 ]
160 } else if core.is_new_fs_connector() {
161 vec![
162 ColumnCatalog {
163 column_desc: ColumnDesc::from_field_with_column_id(
164 &Field {
165 name: "filename".to_owned(),
166 data_type: DataType::Varchar,
167 },
168 0,
169 ),
170 is_hidden: false,
171 },
172 ColumnCatalog {
174 column_desc: ColumnDesc::from_field_with_column_id(
175 &Field {
176 name: "last_edit_time".to_owned(),
177 data_type: DataType::Timestamptz,
178 },
179 1,
180 ),
181 is_hidden: false,
182 },
183 ColumnCatalog {
184 column_desc: ColumnDesc::from_field_with_column_id(
185 &Field {
186 name: "file_size".to_owned(),
187 data_type: DataType::Int64,
188 },
189 2,
190 ),
191 is_hidden: false,
192 },
193 ]
194 } else {
195 unreachable!()
196 };
197 Self {
198 column_catalog,
199 row_id_index: None,
200 ..core
201 }
202 }
203
204 pub fn is_new_fs_connector(&self) -> bool {
205 self.catalog
206 .as_ref()
207 .is_some_and(|catalog| catalog.with_properties.is_new_fs_connector())
208 }
209
210 pub fn is_iceberg_connector(&self) -> bool {
211 self.catalog
212 .as_ref()
213 .is_some_and(|catalog| catalog.with_properties.is_iceberg_connector())
214 }
215
216 pub fn is_kafka_connector(&self) -> bool {
217 self.catalog
218 .as_ref()
219 .is_some_and(|catalog| catalog.with_properties.is_kafka_connector())
220 }
221
222 pub fn is_batch_connector(&self) -> bool {
223 self.catalog
224 .as_ref()
225 .is_some_and(|catalog| catalog.with_properties.is_batch_connector())
226 }
227
228 pub fn requires_singleton(&self) -> bool {
229 self.is_iceberg_connector() || self.is_batch_connector()
230 }
231
232 pub fn support_time_travel(&self) -> bool {
234 self.is_iceberg_connector()
235 }
236
237 pub fn exclude_iceberg_hidden_columns(mut self) -> Self {
238 let Some(catalog) = &mut self.catalog else {
239 return self;
240 };
241 if catalog.info.is_shared() {
242 return self;
244 }
245 if self.kind != SourceNodeKind::CreateMViewOrBatch {
246 return self;
247 }
248
249 let prune = |col: &ColumnCatalog| col.is_hidden() && !col.is_row_id_column();
250
251 self.row_id_index = self.row_id_index.map(|idx| {
253 let mut cnt = 0;
254 for col in self.column_catalog.iter().take(idx + 1) {
255 if prune(col) {
256 cnt += 1;
257 }
258 }
259 idx - cnt
260 });
261 self.column_catalog.retain(|c| !prune(c));
262 self
263 }
264
265 pub fn exclude_generated_columns(mut self) -> (Self, Option<usize>) {
268 let original_row_id_index = self.row_id_index;
269 self.row_id_index = original_row_id_index.map(|idx| {
271 let mut cnt = 0;
272 for col in self.column_catalog.iter().take(idx + 1) {
273 if col.is_generated() {
274 cnt += 1;
275 }
276 }
277 idx - cnt
278 });
279 self.column_catalog.retain(|c| !c.is_generated());
280 (self, original_row_id_index)
281 }
282
283 pub fn infer_internal_table_catalog(is_distributed: bool) -> TableCatalog {
304 let mut builder = TableCatalogBuilder::default();
305
306 let key = Field {
307 data_type: DataType::Varchar,
308 name: "partition_id".to_owned(),
309 };
310 let value = Field {
311 data_type: DataType::Jsonb,
312 name: "offset_info".to_owned(),
313 };
314
315 let ordered_col_idx = builder.add_column(&key);
316 builder.add_column(&value);
317 builder.add_order_column(ordered_col_idx, OrderType::ascending());
318
319 builder.build(
320 if is_distributed {
321 vec![ordered_col_idx]
322 } else {
323 vec![]
324 },
325 1,
326 )
327 }
328}