risingwave_frontend/optimizer/plan_node/generic/
source.rs1use std::rc::Rc;
16
17use educe::Educe;
18use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Field, Schema};
19use risingwave_common::types::DataType;
20use risingwave_common::util::sort_util::OrderType;
21use risingwave_connector::WithPropertiesExt;
22use risingwave_sqlparser::ast::AsOf;
23
24use super::super::utils::TableCatalogBuilder;
25use super::GenericPlanNode;
26use crate::TableCatalog;
27use crate::catalog::source_catalog::SourceCatalog;
28use crate::optimizer::optimizer_context::OptimizerContextRef;
29use crate::optimizer::property::{FunctionalDependencySet, StreamKind};
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33#[expect(clippy::enum_variant_names)]
34pub enum SourceNodeKind {
35 CreateTable,
37 CreateSharedSource,
39 CreateMViewOrBatch,
45}
46
47#[derive(Debug, Clone, Educe)]
49#[educe(PartialEq, Eq, Hash)]
50pub struct Source {
51 pub catalog: Option<Rc<SourceCatalog>>,
53
54 pub column_catalog: Vec<ColumnCatalog>,
58 pub row_id_index: Option<usize>,
59
60 pub kind: SourceNodeKind,
61
62 #[educe(PartialEq(ignore))]
63 #[educe(Hash(ignore))]
64 pub ctx: OptimizerContextRef,
65
66 pub as_of: Option<AsOf>,
67}
68
69impl GenericPlanNode for Source {
70 fn schema(&self) -> Schema {
71 let fields = self
72 .column_catalog
73 .iter()
74 .map(|c| (&c.column_desc).into())
75 .collect();
76 Schema { fields }
77 }
78
79 fn stream_key(&self) -> Option<Vec<usize>> {
80 if let Some(idx) = self.row_id_index {
83 Some(vec![idx])
84 } else if let Some(catalog) = &self.catalog {
85 catalog
86 .pk_col_ids
87 .iter()
88 .map(|id| {
89 self.column_catalog
90 .iter()
91 .position(|c| c.column_id() == *id)
92 })
93 .collect::<Option<Vec<_>>>()
94 } else {
95 None
96 }
97 }
98
99 fn ctx(&self) -> OptimizerContextRef {
100 self.ctx.clone()
101 }
102
103 fn functional_dependency(&self) -> FunctionalDependencySet {
104 let pk_indices = self.stream_key();
105 match pk_indices {
106 Some(pk_indices) => {
107 debug_assert!(
108 pk_indices
109 .iter()
110 .all(|idx| *idx < self.column_catalog.len())
111 );
112 FunctionalDependencySet::with_key(self.column_catalog.len(), &pk_indices)
113 }
114 None => FunctionalDependencySet::new(self.column_catalog.len()),
115 }
116 }
117}
118
119impl Source {
120 pub fn stream_kind(&self) -> StreamKind {
121 if let Some(catalog) = &self.catalog {
122 if catalog.append_only {
123 StreamKind::AppendOnly
124 } else {
125 StreamKind::Upsert
128 }
129 } else {
130 StreamKind::AppendOnly
132 }
133 }
134
135 pub fn file_list_node(core: Self) -> Self {
137 let column_catalog = if core.is_iceberg_connector() {
138 vec![
139 ColumnCatalog {
140 column_desc: ColumnDesc::from_field_with_column_id(
141 &Field {
142 name: "file_path".to_owned(),
143 data_type: DataType::Varchar,
144 },
145 0,
146 ),
147 is_hidden: false,
148 },
149 ColumnCatalog {
150 column_desc: ColumnDesc::from_field_with_column_id(
151 &Field {
152 name: "file_scan_task".to_owned(),
153 data_type: DataType::Jsonb,
154 },
155 1,
156 ),
157 is_hidden: false,
158 },
159 ]
160 } else if core.is_batch_connector() {
161 vec![
162 ColumnCatalog {
163 column_desc: ColumnDesc::from_field_with_column_id(
164 &Field {
165 name: "batch_task_id".to_owned(),
166 data_type: DataType::Varchar,
167 },
168 0,
169 ),
170 is_hidden: false,
171 },
172 ColumnCatalog {
173 column_desc: ColumnDesc::from_field_with_column_id(
174 &Field {
175 name: "batch_task_info".to_owned(),
176 data_type: DataType::Jsonb,
177 },
178 1,
179 ),
180 is_hidden: false,
181 },
182 ]
183 } else if core.is_new_fs_connector() {
184 vec![
185 ColumnCatalog {
186 column_desc: ColumnDesc::from_field_with_column_id(
187 &Field {
188 name: "filename".to_owned(),
189 data_type: DataType::Varchar,
190 },
191 0,
192 ),
193 is_hidden: false,
194 },
195 ColumnCatalog {
197 column_desc: ColumnDesc::from_field_with_column_id(
198 &Field {
199 name: "last_edit_time".to_owned(),
200 data_type: DataType::Timestamptz,
201 },
202 1,
203 ),
204 is_hidden: false,
205 },
206 ColumnCatalog {
207 column_desc: ColumnDesc::from_field_with_column_id(
208 &Field {
209 name: "file_size".to_owned(),
210 data_type: DataType::Int64,
211 },
212 2,
213 ),
214 is_hidden: false,
215 },
216 ]
217 } else {
218 unreachable!()
219 };
220 Self {
221 column_catalog,
222 row_id_index: None,
223 ..core
224 }
225 }
226
227 pub fn is_new_fs_connector(&self) -> bool {
228 self.catalog
229 .as_ref()
230 .is_some_and(|catalog| catalog.with_properties.is_new_fs_connector())
231 }
232
233 pub fn is_iceberg_connector(&self) -> bool {
234 self.catalog
235 .as_ref()
236 .is_some_and(|catalog| catalog.with_properties.is_iceberg_connector())
237 }
238
239 pub fn is_kafka_connector(&self) -> bool {
240 self.catalog
241 .as_ref()
242 .is_some_and(|catalog| catalog.with_properties.is_kafka_connector())
243 }
244
245 pub fn is_batch_connector(&self) -> bool {
246 self.catalog
247 .as_ref()
248 .is_some_and(|catalog| catalog.with_properties.is_batch_connector())
249 }
250
251 pub fn requires_singleton(&self) -> bool {
252 self.is_iceberg_connector() || self.is_batch_connector()
253 }
254
255 pub fn support_time_travel(&self) -> bool {
257 self.is_iceberg_connector()
258 }
259
260 pub fn exclude_iceberg_hidden_columns(mut self) -> Self {
261 let Some(catalog) = &mut self.catalog else {
262 return self;
263 };
264 if catalog.info.is_shared() {
265 return self;
267 }
268 if self.kind != SourceNodeKind::CreateMViewOrBatch {
269 return self;
270 }
271
272 let prune = |col: &ColumnCatalog| col.is_hidden() && !col.is_row_id_column();
273
274 self.row_id_index = self.row_id_index.map(|idx| {
276 let mut cnt = 0;
277 for col in self.column_catalog.iter().take(idx + 1) {
278 if prune(col) {
279 cnt += 1;
280 }
281 }
282 idx - cnt
283 });
284 self.column_catalog.retain(|c| !prune(c));
285 self
286 }
287
288 pub fn exclude_generated_columns(mut self) -> (Self, Option<usize>) {
291 let original_row_id_index = self.row_id_index;
292 self.row_id_index = original_row_id_index.map(|idx| {
294 let mut cnt = 0;
295 for col in self.column_catalog.iter().take(idx + 1) {
296 if col.is_generated() {
297 cnt += 1;
298 }
299 }
300 idx - cnt
301 });
302 self.column_catalog.retain(|c| !c.is_generated());
303 (self, original_row_id_index)
304 }
305
306 pub fn infer_internal_table_catalog(is_distributed: bool) -> TableCatalog {
327 let mut builder = TableCatalogBuilder::default();
328
329 let key = Field {
330 data_type: DataType::Varchar,
331 name: "partition_id".to_owned(),
332 };
333 let value = Field {
334 data_type: DataType::Jsonb,
335 name: "offset_info".to_owned(),
336 };
337
338 let ordered_col_idx = builder.add_column(&key);
339 builder.add_column(&value);
340 builder.add_order_column(ordered_col_idx, OrderType::ascending());
341
342 builder.build(
343 if is_distributed {
344 vec![ordered_col_idx]
345 } else {
346 vec![]
347 },
348 1,
349 )
350 }
351}