risingwave_frontend/optimizer/plan_node/
logical_source.rs1use std::rc::Rc;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_common::bail;
19use risingwave_common::catalog::{
20 ColumnCatalog, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME,
21 ICEBERG_SEQUENCE_NUM_COLUMN_NAME, ROW_ID_COLUMN_NAME,
22};
23use risingwave_pb::plan_common::GeneratedColumnDesc;
24use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
25use risingwave_pb::plan_common::source_refresh_mode::RefreshMode;
26use risingwave_sqlparser::ast::AsOf;
27
28use super::generic::{GenericPlanRef, SourceNodeKind};
29use super::stream_watermark_filter::StreamWatermarkFilter;
30use super::utils::{Distill, childless_record};
31use super::{
32 BatchProject, BatchSource, ColPrunable, ExprRewritable, Logical, LogicalFilter,
33 LogicalPlanRef as PlanRef, LogicalProject, PlanBase, PredicatePushdown, StreamPlanRef,
34 StreamProject, StreamRowIdGen, StreamSource, StreamSourceScan, ToBatch, ToStream, generic,
35};
36use crate::catalog::source_catalog::SourceCatalog;
37use crate::error::Result;
38use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor, InputRef};
39use crate::optimizer::optimizer_context::OptimizerContextRef;
40use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
41use crate::optimizer::plan_node::stream_fs_fetch::StreamFsFetch;
42use crate::optimizer::plan_node::utils::column_names_pretty;
43use crate::optimizer::plan_node::{
44 ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, StreamDedup,
45 ToStreamContext,
46};
47use crate::optimizer::property::Distribution::HashShard;
48use crate::optimizer::property::{
49 Distribution, MonotonicityMap, RequiredDist, StreamKind, WatermarkColumns,
50};
51use crate::utils::{ColIndexMapping, Condition, IndexRewriter};
52
53#[derive(Debug, Clone, PartialEq, Eq, Hash)]
55pub struct LogicalSource {
56 pub base: PlanBase<Logical>,
57 pub core: generic::Source,
58
59 pub(crate) output_exprs: Option<Vec<ExprImpl>>,
62 pub(crate) output_row_id_index: Option<usize>,
65}
66
67impl LogicalSource {
68 pub fn new(
69 source_catalog: Option<Rc<SourceCatalog>>,
70 column_catalog: Vec<ColumnCatalog>,
71 row_id_index: Option<usize>,
72 kind: SourceNodeKind,
73 ctx: OptimizerContextRef,
74 as_of: Option<AsOf>,
75 ) -> Result<Self> {
76 let core = generic::Source {
83 catalog: source_catalog,
84 column_catalog,
85 row_id_index,
86 kind,
87 ctx,
88 as_of,
89 };
90
91 if core.as_of.is_some() && !core.support_time_travel() {
92 bail!("Time travel is not supported for the source")
93 }
94
95 let base = PlanBase::new_logical_with_core(&core);
96
97 let output_exprs = Self::derive_output_exprs_from_generated_columns(&core.column_catalog)?;
98 let (core, output_row_id_index) = core.exclude_generated_columns();
99
100 Ok(LogicalSource {
101 base,
102 core,
103 output_exprs,
104 output_row_id_index,
105 })
106 }
107
108 pub fn with_catalog(
109 source_catalog: Rc<SourceCatalog>,
110 kind: SourceNodeKind,
111 ctx: OptimizerContextRef,
112 as_of: Option<AsOf>,
113 ) -> Result<Self> {
114 let column_catalogs = source_catalog.columns.clone();
115 let row_id_index = source_catalog.row_id_index;
116 if !source_catalog.append_only {
117 assert!(row_id_index.is_none());
118 }
119
120 Self::new(
121 Some(source_catalog),
122 column_catalogs,
123 row_id_index,
124 kind,
125 ctx,
126 as_of,
127 )
128 }
129
130 pub fn derive_output_exprs_from_generated_columns(
135 columns: &[ColumnCatalog],
136 ) -> Result<Option<Vec<ExprImpl>>> {
137 if !columns.iter().any(|c| c.is_generated()) {
138 return Ok(None);
139 }
140
141 let col_mapping = {
142 let mut mapping = vec![None; columns.len()];
143 let mut cur = 0;
144 for (idx, column) in columns.iter().enumerate() {
145 if !column.is_generated() {
146 mapping[idx] = Some(cur);
147 cur += 1;
148 } else {
149 mapping[idx] = None;
150 }
151 }
152 ColIndexMapping::new(mapping, columns.len())
153 };
154
155 let mut rewriter = IndexRewriter::new(col_mapping);
156 let mut exprs = Vec::with_capacity(columns.len());
157 let mut cur = 0;
158 for column in columns {
159 let column_desc = &column.column_desc;
160 let ret_data_type = column_desc.data_type.clone();
161
162 if let Some(GeneratedOrDefaultColumn::GeneratedColumn(generated_column)) =
163 &column_desc.generated_or_default_column
164 {
165 let GeneratedColumnDesc { expr } = generated_column;
166 let proj_expr =
168 rewriter.rewrite_expr(ExprImpl::from_expr_proto(expr.as_ref().unwrap())?);
169 let casted_expr = proj_expr.cast_assign(&ret_data_type)?;
170 exprs.push(casted_expr);
171 } else {
172 let input_ref = InputRef {
173 data_type: ret_data_type,
174 index: cur,
175 };
176 cur += 1;
177 exprs.push(ExprImpl::InputRef(Box::new(input_ref)));
178 }
179 }
180
181 Ok(Some(exprs))
182 }
183
184 fn create_non_shared_source_plan(core: generic::Source) -> Result<StreamPlanRef> {
185 let mut plan;
186 if core.is_new_fs_connector() {
187 plan = Self::create_list_plan(core.clone(), true)?;
188 plan = StreamFsFetch::new(plan, core).into();
189 } else if core.is_iceberg_connector() || core.is_batch_connector() {
190 plan = Self::create_list_plan(core.clone(), false)?;
191 plan = StreamFsFetch::new(plan, core).into();
192 } else {
193 plan = StreamSource::new(core).into()
194 }
195 Ok(plan)
196 }
197
198 fn create_list_plan(core: generic::Source, dedup: bool) -> Result<StreamPlanRef> {
200 let downstream_columns = core.column_catalog.clone();
201 let logical_source = generic::Source::file_list_node(core);
202 let mut list_plan: StreamPlanRef = StreamSource {
203 base: PlanBase::new_stream_with_core(
204 &logical_source,
205 Distribution::Single,
206 StreamKind::AppendOnly, false,
208 WatermarkColumns::new(),
209 MonotonicityMap::new(),
210 ),
211 core: logical_source,
212 downstream_columns: Some(downstream_columns),
213 }
214 .into();
215 list_plan = RequiredDist::shard_by_key(list_plan.schema().len(), &[0])
216 .streaming_enforce_if_not_satisfies(list_plan)?;
217 if dedup {
218 list_plan = StreamDedup::new(generic::Dedup {
219 input: list_plan,
220 dedup_cols: vec![0],
221 })
222 .into();
223 }
224
225 Ok(list_plan)
226 }
227
228 pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
229 self.core.catalog.clone()
230 }
231
232 pub fn clone_with_column_catalog(&self, column_catalog: Vec<ColumnCatalog>) -> Result<Self> {
233 let row_id_index = column_catalog.iter().position(|c| c.is_row_id_column());
234 let kind = self.core.kind.clone();
235 let ctx = self.core.ctx.clone();
236 let as_of = self.core.as_of.clone();
237 Self::new(
238 self.source_catalog(),
239 column_catalog,
240 row_id_index,
241 kind,
242 ctx,
243 as_of,
244 )
245 }
246
247 fn prune_col_for_iceberg_source(&self, required_cols: &[usize]) -> PlanRef {
248 assert!(self.core.is_iceberg_connector());
249 let schema_len = self.schema().len();
254 assert!(
255 schema_len >= 4,
256 "Iceberg source must have at least 4 columns (3 iceberg hidden + 1 row_id)"
257 );
258
259 assert_eq!(
260 self.core.column_catalog[schema_len - 4].name(),
261 ICEBERG_SEQUENCE_NUM_COLUMN_NAME
262 );
263 assert_eq!(
264 self.core.column_catalog[schema_len - 3].name(),
265 ICEBERG_FILE_PATH_COLUMN_NAME
266 );
267 assert_eq!(
268 self.core.column_catalog[schema_len - 2].name(),
269 ICEBERG_FILE_POS_COLUMN_NAME
270 );
271 assert_eq!(
272 self.core.column_catalog[schema_len - 1].name(),
273 ROW_ID_COLUMN_NAME
274 );
275 assert_eq!(self.output_row_id_index, Some(self.schema().len() - 1));
276
277 let iceberg_start_idx = schema_len - 4;
278 let row_id_idx = schema_len - 1;
279
280 let mut source_cols = Vec::new();
282
283 for &idx in required_cols {
285 if idx < iceberg_start_idx {
286 source_cols.push(idx);
288 }
289 }
290
291 source_cols.extend([
293 iceberg_start_idx,
294 iceberg_start_idx + 1,
295 iceberg_start_idx + 2,
296 row_id_idx,
297 ]);
298
299 let mut core = self.core.clone();
301 core.column_catalog = source_cols
302 .iter()
303 .map(|idx| core.column_catalog[*idx].clone())
304 .collect();
305 core.row_id_index = Some(source_cols.len() - 1);
307
308 let base = PlanBase::new_logical_with_core(&core);
309 let output_exprs =
310 Self::derive_output_exprs_from_generated_columns(&core.column_catalog).unwrap();
311 let (core, _) = core.exclude_generated_columns();
312
313 let pruned_source = LogicalSource {
314 base,
315 core,
316 output_exprs,
317 output_row_id_index: Some(source_cols.len() - 1),
318 };
319
320 let mut old_to_new = vec![None; self.schema().len()];
322 for (new_idx, &old_idx) in source_cols.iter().enumerate() {
323 old_to_new[old_idx] = Some(new_idx);
324 }
325
326 let new_required: Vec<_> = required_cols
328 .iter()
329 .map(|&old_idx| old_to_new[old_idx].unwrap())
330 .collect();
331
332 let mapping =
333 ColIndexMapping::with_remaining_columns(&new_required, pruned_source.schema().len());
334 LogicalProject::with_mapping(pruned_source.into(), mapping).into()
335 }
336}
337
338impl_plan_tree_node_for_leaf! { Logical, LogicalSource}
339impl Distill for LogicalSource {
340 fn distill<'a>(&self) -> XmlNode<'a> {
341 let fields = if let Some(catalog) = self.source_catalog() {
342 let src = Pretty::from(catalog.name.clone());
343 let mut fields = vec![
344 ("source", src),
345 ("columns", column_names_pretty(self.schema())),
346 ];
347 if let Some(as_of) = &self.core.as_of {
348 fields.push(("as_of", Pretty::debug(as_of)));
349 }
350 fields
351 } else {
352 vec![]
353 };
354 childless_record("LogicalSource", fields)
355 }
356}
357
358impl ColPrunable for LogicalSource {
359 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
360 let is_refreshable_iceberg = self.source_catalog().is_some_and(|catalog| {
361 catalog.refresh_mode.is_some_and(|refresh_mode| {
362 matches!(refresh_mode.refresh_mode, Some(RefreshMode::FullReload(_)))
363 })
364 }); if self.core.is_iceberg_connector() && !is_refreshable_iceberg {
367 self.prune_col_for_iceberg_source(required_cols)
368 } else {
369 let mapping =
371 ColIndexMapping::with_remaining_columns(required_cols, self.schema().len());
372 LogicalProject::with_mapping(self.clone().into(), mapping).into()
373 }
374 }
375}
376
377impl ExprRewritable<Logical> for LogicalSource {
378 fn has_rewritable_expr(&self) -> bool {
379 self.output_exprs.is_some()
380 }
381
382 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
383 let mut output_exprs = self.output_exprs.clone();
384
385 for expr in output_exprs.iter_mut().flatten() {
386 *expr = r.rewrite_expr(expr.clone());
387 }
388
389 Self {
390 output_exprs,
391 ..self.clone()
392 }
393 .into()
394 }
395}
396
397impl ExprVisitable for LogicalSource {
398 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
399 self.output_exprs
400 .iter()
401 .flatten()
402 .for_each(|e| v.visit_expr(e));
403 }
404}
405
406impl PredicatePushdown for LogicalSource {
407 fn predicate_pushdown(
408 &self,
409 predicate: Condition,
410 _ctx: &mut PredicatePushdownContext,
411 ) -> PlanRef {
412 LogicalFilter::create(self.clone().into(), predicate)
413 }
414}
415
416impl ToBatch for LogicalSource {
417 fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
418 assert!(
419 !self.core.is_kafka_connector(),
420 "LogicalSource with a kafka property should be converted to LogicalKafkaScan"
421 );
422 assert!(
423 !self.core.is_iceberg_connector(),
424 "LogicalSource with a iceberg property should be converted to LogicalIcebergScan"
425 );
426 let mut plan = BatchSource::new(self.core.clone()).into();
427
428 if let Some(exprs) = &self.output_exprs {
429 let logical_project = generic::Project::new(exprs.clone(), plan);
430 plan = BatchProject::new(logical_project).into();
431 }
432
433 Ok(plan)
434 }
435}
436
437impl ToStream for LogicalSource {
438 fn to_stream(
439 &self,
440 _ctx: &mut ToStreamContext,
441 ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
442 let mut plan;
443
444 match self.core.kind {
445 SourceNodeKind::CreateTable | SourceNodeKind::CreateSharedSource => {
446 plan = Self::create_non_shared_source_plan(self.core.clone())?;
449 }
450 SourceNodeKind::CreateMViewOrBatch => {
451 let use_shared_source = self.source_catalog().is_some_and(|c| c.info.is_shared());
455 if use_shared_source {
456 plan = StreamSourceScan::new(self.core.clone()).into();
457 } else {
458 plan = Self::create_non_shared_source_plan(self.core.clone())?;
460 }
461
462 if let Some(exprs) = &self.output_exprs {
463 let logical_project = generic::Project::new(exprs.clone(), plan);
464 plan = StreamProject::new(logical_project).into();
465 }
466
467 if let Some(catalog) = self.source_catalog()
468 && !catalog.watermark_descs.is_empty()
469 {
470 plan = StreamWatermarkFilter::new(plan, catalog.watermark_descs.clone()).into();
471 }
472
473 if let Some(row_id_index) = self.output_row_id_index {
474 plan = StreamRowIdGen::new_with_dist(
475 plan,
476 row_id_index,
477 HashShard(vec![row_id_index]),
478 )
479 .into();
480 }
481 }
482 }
483 Ok(plan)
484 }
485
486 fn logical_rewrite_for_stream(
487 &self,
488 _ctx: &mut RewriteStreamContext,
489 ) -> Result<(PlanRef, ColIndexMapping)> {
490 Ok((
491 self.clone().into(),
492 ColIndexMapping::identity(self.schema().len()),
493 ))
494 }
495}