risingwave_frontend/optimizer/plan_node/generic/
source.rs1use std::rc::Rc;
16
17use educe::Educe;
18use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, Schema};
19use risingwave_common::types::DataType;
20use risingwave_common::util::sort_util::OrderType;
21use risingwave_connector::WithPropertiesExt;
22use risingwave_sqlparser::ast::AsOf;
23
24use super::super::utils::TableCatalogBuilder;
25use super::GenericPlanNode;
26use crate::TableCatalog;
27use crate::catalog::source_catalog::SourceCatalog;
28use crate::optimizer::optimizer_context::OptimizerContextRef;
29use crate::optimizer::property::{FunctionalDependencySet, StreamKind};
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33#[expect(clippy::enum_variant_names)]
34pub enum SourceNodeKind {
35 CreateTable,
37 CreateSharedSource,
39 CreateMViewOrBatch,
45}
46
47#[derive(Debug, Clone, Educe)]
49#[educe(PartialEq, Eq, Hash)]
50pub struct Source {
51 pub catalog: Option<Rc<SourceCatalog>>,
53
54 pub column_catalog: Vec<ColumnCatalog>,
58 pub row_id_index: Option<usize>,
59
60 pub kind: SourceNodeKind,
61
62 #[educe(PartialEq(ignore))]
63 #[educe(Hash(ignore))]
64 pub ctx: OptimizerContextRef,
65
66 pub as_of: Option<AsOf>,
67}
68
69impl GenericPlanNode for Source {
70 fn schema(&self) -> Schema {
71 let fields = self
72 .column_catalog
73 .iter()
74 .map(|c| (&c.column_desc).into())
75 .collect();
76 Schema { fields }
77 }
78
79 fn stream_key(&self) -> Option<Vec<usize>> {
80 self.row_id_index.map(|idx| vec![idx])
83 }
84
85 fn ctx(&self) -> OptimizerContextRef {
86 self.ctx.clone()
87 }
88
89 fn functional_dependency(&self) -> FunctionalDependencySet {
90 let pk_indices = self.stream_key();
91 match pk_indices {
92 Some(pk_indices) => {
93 debug_assert!(
94 pk_indices
95 .iter()
96 .all(|idx| *idx < self.column_catalog.len())
97 );
98 FunctionalDependencySet::with_key(self.column_catalog.len(), &pk_indices)
99 }
100 None => FunctionalDependencySet::new(self.column_catalog.len()),
101 }
102 }
103}
104
105impl Source {
106 pub fn stream_kind(&self) -> StreamKind {
107 if let Some(catalog) = &self.catalog {
108 if catalog.append_only {
109 StreamKind::AppendOnly
110 } else {
111 StreamKind::Upsert
114 }
115 } else {
116 StreamKind::AppendOnly
118 }
119 }
120
121 pub fn file_list_node(core: Self) -> Self {
123 let column_catalog = if core.is_iceberg_connector() {
124 vec![
125 ColumnCatalog {
126 column_desc: ColumnDesc::from_field_with_column_id(
127 &Field {
128 name: "file_path".to_owned(),
129 data_type: DataType::Varchar,
130 },
131 0,
132 ),
133 is_hidden: false,
134 },
135 ColumnCatalog {
136 column_desc: ColumnDesc::from_field_with_column_id(
137 &Field {
138 name: "file_scan_task".to_owned(),
139 data_type: DataType::Jsonb,
140 },
141 1,
142 ),
143 is_hidden: false,
144 },
145 ]
146 } else if core.is_new_fs_connector() {
147 vec![
148 ColumnCatalog {
149 column_desc: ColumnDesc::from_field_with_column_id(
150 &Field {
151 name: "filename".to_owned(),
152 data_type: DataType::Varchar,
153 },
154 0,
155 ),
156 is_hidden: false,
157 },
158 ColumnCatalog {
160 column_desc: ColumnDesc::from_field_with_column_id(
161 &Field {
162 name: "last_edit_time".to_owned(),
163 data_type: DataType::Timestamptz,
164 },
165 1,
166 ),
167 is_hidden: false,
168 },
169 ColumnCatalog {
170 column_desc: ColumnDesc::from_field_with_column_id(
171 &Field {
172 name: "file_size".to_owned(),
173 data_type: DataType::Int64,
174 },
175 2,
176 ),
177 is_hidden: false,
178 },
179 ]
180 } else {
181 unreachable!()
182 };
183 Self {
184 column_catalog,
185 row_id_index: None,
186 ..core
187 }
188 }
189
190 pub fn is_new_fs_connector(&self) -> bool {
191 self.catalog
192 .as_ref()
193 .is_some_and(|catalog| catalog.with_properties.is_new_fs_connector())
194 }
195
196 pub fn is_iceberg_connector(&self) -> bool {
197 self.catalog
198 .as_ref()
199 .is_some_and(|catalog| catalog.with_properties.is_iceberg_connector())
200 }
201
202 pub fn is_kafka_connector(&self) -> bool {
203 self.catalog
204 .as_ref()
205 .is_some_and(|catalog| catalog.with_properties.is_kafka_connector())
206 }
207
208 pub fn is_batch_connector(&self) -> bool {
209 self.catalog
210 .as_ref()
211 .is_some_and(|catalog| catalog.with_properties.is_batch_connector())
212 }
213
214 pub fn requires_singleton(&self) -> bool {
215 self.is_iceberg_connector() || self.is_batch_connector()
216 }
217
218 pub fn support_time_travel(&self) -> bool {
220 self.is_iceberg_connector()
221 }
222
223 pub fn exclude_iceberg_hidden_columns(mut self) -> Self {
224 let Some(catalog) = &mut self.catalog else {
225 return self;
226 };
227 if catalog.info.is_shared() {
228 return self;
230 }
231 if self.kind != SourceNodeKind::CreateMViewOrBatch {
232 return self;
233 }
234
235 let prune = |col: &ColumnCatalog| col.is_hidden() && !col.is_row_id_column();
236
237 self.row_id_index = self.row_id_index.map(|idx| {
239 let mut cnt = 0;
240 for col in self.column_catalog.iter().take(idx + 1) {
241 if prune(col) {
242 cnt += 1;
243 }
244 }
245 idx - cnt
246 });
247 self.column_catalog.retain(|c| !prune(c));
248 self
249 }
250
251 pub fn exclude_generated_columns(mut self) -> (Self, Option<usize>) {
254 let original_row_id_index = self.row_id_index;
255 self.row_id_index = original_row_id_index.map(|idx| {
257 let mut cnt = 0;
258 for col in self.column_catalog.iter().take(idx + 1) {
259 if col.is_generated() {
260 cnt += 1;
261 }
262 }
263 idx - cnt
264 });
265 self.column_catalog.retain(|c| !c.is_generated());
266 (self, original_row_id_index)
267 }
268
269 pub fn infer_internal_table_catalog(is_distributed: bool) -> TableCatalog {
290 let mut builder = TableCatalogBuilder::default();
291
292 let key = Field {
293 data_type: DataType::Varchar,
294 name: "partition_id".to_owned(),
295 };
296 let value = Field {
297 data_type: DataType::Jsonb,
298 name: "offset_info".to_owned(),
299 };
300
301 let ordered_col_idx = builder.add_column(&key);
302 builder.add_column(&value);
303 builder.add_order_column(ordered_col_idx, OrderType::ascending());
304
305 builder.build(
306 if is_distributed {
307 vec![ordered_col_idx]
308 } else {
309 vec![]
310 },
311 1,
312 )
313 }
314}