risingwave_frontend/optimizer/plan_node/generic/
source.rs1use std::rc::Rc;
16
17use educe::Educe;
18use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, Schema};
19use risingwave_common::types::DataType;
20use risingwave_common::util::sort_util::OrderType;
21use risingwave_connector::WithPropertiesExt;
22use risingwave_sqlparser::ast::AsOf;
23
24use super::super::utils::TableCatalogBuilder;
25use super::GenericPlanNode;
26use crate::TableCatalog;
27use crate::catalog::source_catalog::SourceCatalog;
28use crate::optimizer::optimizer_context::OptimizerContextRef;
29use crate::optimizer::property::FunctionalDependencySet;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33#[expect(clippy::enum_variant_names)]
34pub enum SourceNodeKind {
35 CreateTable,
37 CreateSharedSource,
39 CreateMViewOrBatch,
45}
46
47#[derive(Debug, Clone, Educe)]
49#[educe(PartialEq, Eq, Hash)]
50pub struct Source {
51 pub catalog: Option<Rc<SourceCatalog>>,
53
54 pub column_catalog: Vec<ColumnCatalog>,
58 pub row_id_index: Option<usize>,
59
60 pub kind: SourceNodeKind,
61
62 #[educe(PartialEq(ignore))]
63 #[educe(Hash(ignore))]
64 pub ctx: OptimizerContextRef,
65
66 pub as_of: Option<AsOf>,
67}
68
69impl GenericPlanNode for Source {
70 fn schema(&self) -> Schema {
71 let fields = self
72 .column_catalog
73 .iter()
74 .map(|c| (&c.column_desc).into())
75 .collect();
76 Schema { fields }
77 }
78
79 fn stream_key(&self) -> Option<Vec<usize>> {
80 self.row_id_index.map(|idx| vec![idx])
83 }
84
85 fn ctx(&self) -> OptimizerContextRef {
86 self.ctx.clone()
87 }
88
89 fn functional_dependency(&self) -> FunctionalDependencySet {
90 let pk_indices = self.stream_key();
91 match pk_indices {
92 Some(pk_indices) => {
93 debug_assert!(
94 pk_indices
95 .iter()
96 .all(|idx| *idx < self.column_catalog.len())
97 );
98 FunctionalDependencySet::with_key(self.column_catalog.len(), &pk_indices)
99 }
100 None => FunctionalDependencySet::new(self.column_catalog.len()),
101 }
102 }
103}
104
105impl Source {
106 pub fn file_list_node(core: Self) -> Self {
108 let column_catalog = if core.is_iceberg_connector() {
109 vec![
110 ColumnCatalog {
111 column_desc: ColumnDesc::from_field_with_column_id(
112 &Field {
113 name: "file_path".to_owned(),
114 data_type: DataType::Varchar,
115 },
116 0,
117 ),
118 is_hidden: false,
119 },
120 ColumnCatalog {
121 column_desc: ColumnDesc::from_field_with_column_id(
122 &Field {
123 name: "file_scan_task".to_owned(),
124 data_type: DataType::Jsonb,
125 },
126 1,
127 ),
128 is_hidden: false,
129 },
130 ]
131 } else if core.is_new_fs_connector() {
132 vec![
133 ColumnCatalog {
134 column_desc: ColumnDesc::from_field_with_column_id(
135 &Field {
136 name: "filename".to_owned(),
137 data_type: DataType::Varchar,
138 },
139 0,
140 ),
141 is_hidden: false,
142 },
143 ColumnCatalog {
145 column_desc: ColumnDesc::from_field_with_column_id(
146 &Field {
147 name: "last_edit_time".to_owned(),
148 data_type: DataType::Timestamptz,
149 },
150 1,
151 ),
152 is_hidden: false,
153 },
154 ColumnCatalog {
155 column_desc: ColumnDesc::from_field_with_column_id(
156 &Field {
157 name: "file_size".to_owned(),
158 data_type: DataType::Int64,
159 },
160 2,
161 ),
162 is_hidden: false,
163 },
164 ]
165 } else {
166 unreachable!()
167 };
168 Self {
169 column_catalog,
170 row_id_index: None,
171 ..core
172 }
173 }
174
175 pub fn is_new_fs_connector(&self) -> bool {
176 self.catalog
177 .as_ref()
178 .is_some_and(|catalog| catalog.with_properties.is_new_fs_connector())
179 }
180
181 pub fn is_iceberg_connector(&self) -> bool {
182 self.catalog
183 .as_ref()
184 .is_some_and(|catalog| catalog.with_properties.is_iceberg_connector())
185 }
186
187 pub fn is_kafka_connector(&self) -> bool {
188 self.catalog
189 .as_ref()
190 .is_some_and(|catalog| catalog.with_properties.is_kafka_connector())
191 }
192
193 pub fn support_time_travel(&self) -> bool {
195 self.is_iceberg_connector()
196 }
197
198 pub fn exclude_iceberg_hidden_columns(mut self) -> Self {
199 let Some(catalog) = &mut self.catalog else {
200 return self;
201 };
202 if catalog.info.is_shared() {
203 return self;
205 }
206 if self.kind != SourceNodeKind::CreateMViewOrBatch {
207 return self;
208 }
209
210 let prune = |col: &ColumnCatalog| col.is_hidden() && !col.is_row_id_column();
211
212 self.row_id_index = self.row_id_index.map(|idx| {
214 let mut cnt = 0;
215 for col in self.column_catalog.iter().take(idx + 1) {
216 if prune(col) {
217 cnt += 1;
218 }
219 }
220 idx - cnt
221 });
222 self.column_catalog.retain(|c| !prune(c));
223 self
224 }
225
226 pub fn exclude_generated_columns(mut self) -> (Self, Option<usize>) {
229 let original_row_id_index = self.row_id_index;
230 self.row_id_index = original_row_id_index.map(|idx| {
232 let mut cnt = 0;
233 for col in self.column_catalog.iter().take(idx + 1) {
234 if col.is_generated() {
235 cnt += 1;
236 }
237 }
238 idx - cnt
239 });
240 self.column_catalog.retain(|c| !c.is_generated());
241 (self, original_row_id_index)
242 }
243
244 pub fn infer_internal_table_catalog(is_distributed: bool) -> TableCatalog {
265 let mut builder = TableCatalogBuilder::default();
266
267 let key = Field {
268 data_type: DataType::Varchar,
269 name: "partition_id".to_owned(),
270 };
271 let value = Field {
272 data_type: DataType::Jsonb,
273 name: "offset_info".to_owned(),
274 };
275
276 let ordered_col_idx = builder.add_column(&key);
277 builder.add_column(&value);
278 builder.add_order_column(ordered_col_idx, OrderType::ascending());
279
280 builder.build(
281 if is_distributed {
282 vec![ordered_col_idx]
283 } else {
284 vec![]
285 },
286 1,
287 )
288 }
289}