risingwave_frontend/optimizer/plan_node/
logical_source.rs1use std::rc::Rc;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_common::bail;
19use risingwave_common::catalog::{
20 ColumnCatalog, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME,
21 ICEBERG_SEQUENCE_NUM_COLUMN_NAME, ROW_ID_COLUMN_NAME,
22};
23use risingwave_pb::plan_common::GeneratedColumnDesc;
24use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
25use risingwave_sqlparser::ast::AsOf;
26
27use super::generic::{GenericPlanRef, SourceNodeKind};
28use super::stream_watermark_filter::StreamWatermarkFilter;
29use super::utils::{Distill, childless_record};
30use super::{
31 BatchProject, BatchSource, ColPrunable, ExprRewritable, Logical, LogicalFilter,
32 LogicalPlanRef as PlanRef, LogicalProject, PlanBase, PredicatePushdown, StreamPlanRef,
33 StreamProject, StreamRowIdGen, StreamSource, StreamSourceScan, ToBatch, ToStream, generic,
34};
35use crate::catalog::source_catalog::SourceCatalog;
36use crate::error::Result;
37use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor, InputRef};
38use crate::optimizer::optimizer_context::OptimizerContextRef;
39use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
40use crate::optimizer::plan_node::stream_fs_fetch::StreamFsFetch;
41use crate::optimizer::plan_node::utils::column_names_pretty;
42use crate::optimizer::plan_node::{
43 ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, StreamDedup,
44 ToStreamContext,
45};
46use crate::optimizer::property::Distribution::HashShard;
47use crate::optimizer::property::{
48 Distribution, MonotonicityMap, RequiredDist, StreamKind, WatermarkColumns,
49};
50use crate::utils::{ColIndexMapping, Condition, IndexRewriter};
51
52#[derive(Debug, Clone, PartialEq, Eq, Hash)]
54pub struct LogicalSource {
55 pub base: PlanBase<Logical>,
56 pub core: generic::Source,
57
58 pub(crate) output_exprs: Option<Vec<ExprImpl>>,
61 pub(crate) output_row_id_index: Option<usize>,
64}
65
66impl LogicalSource {
67 pub fn new(
68 source_catalog: Option<Rc<SourceCatalog>>,
69 column_catalog: Vec<ColumnCatalog>,
70 row_id_index: Option<usize>,
71 kind: SourceNodeKind,
72 ctx: OptimizerContextRef,
73 as_of: Option<AsOf>,
74 ) -> Result<Self> {
75 let core = generic::Source {
82 catalog: source_catalog,
83 column_catalog,
84 row_id_index,
85 kind,
86 ctx,
87 as_of,
88 };
89
90 if core.as_of.is_some() && !core.support_time_travel() {
91 bail!("Time travel is not supported for the source")
92 }
93
94 let base = PlanBase::new_logical_with_core(&core);
95
96 let output_exprs = Self::derive_output_exprs_from_generated_columns(&core.column_catalog)?;
97 let (core, output_row_id_index) = core.exclude_generated_columns();
98
99 Ok(LogicalSource {
100 base,
101 core,
102 output_exprs,
103 output_row_id_index,
104 })
105 }
106
107 pub fn with_catalog(
108 source_catalog: Rc<SourceCatalog>,
109 kind: SourceNodeKind,
110 ctx: OptimizerContextRef,
111 as_of: Option<AsOf>,
112 ) -> Result<Self> {
113 let column_catalogs = source_catalog.columns.clone();
114 let row_id_index = source_catalog.row_id_index;
115 if !source_catalog.append_only {
116 assert!(row_id_index.is_none());
117 }
118
119 Self::new(
120 Some(source_catalog),
121 column_catalogs,
122 row_id_index,
123 kind,
124 ctx,
125 as_of,
126 )
127 }
128
129 pub fn derive_output_exprs_from_generated_columns(
134 columns: &[ColumnCatalog],
135 ) -> Result<Option<Vec<ExprImpl>>> {
136 if !columns.iter().any(|c| c.is_generated()) {
137 return Ok(None);
138 }
139
140 let col_mapping = {
141 let mut mapping = vec![None; columns.len()];
142 let mut cur = 0;
143 for (idx, column) in columns.iter().enumerate() {
144 if !column.is_generated() {
145 mapping[idx] = Some(cur);
146 cur += 1;
147 } else {
148 mapping[idx] = None;
149 }
150 }
151 ColIndexMapping::new(mapping, columns.len())
152 };
153
154 let mut rewriter = IndexRewriter::new(col_mapping);
155 let mut exprs = Vec::with_capacity(columns.len());
156 let mut cur = 0;
157 for column in columns {
158 let column_desc = &column.column_desc;
159 let ret_data_type = column_desc.data_type.clone();
160
161 if let Some(GeneratedOrDefaultColumn::GeneratedColumn(generated_column)) =
162 &column_desc.generated_or_default_column
163 {
164 let GeneratedColumnDesc { expr } = generated_column;
165 let proj_expr =
167 rewriter.rewrite_expr(ExprImpl::from_expr_proto(expr.as_ref().unwrap())?);
168 let casted_expr = proj_expr.cast_assign(&ret_data_type)?;
169 exprs.push(casted_expr);
170 } else {
171 let input_ref = InputRef {
172 data_type: ret_data_type,
173 index: cur,
174 };
175 cur += 1;
176 exprs.push(ExprImpl::InputRef(Box::new(input_ref)));
177 }
178 }
179
180 Ok(Some(exprs))
181 }
182
183 fn create_non_shared_source_plan(core: generic::Source) -> Result<StreamPlanRef> {
184 let mut plan;
185 if core.is_new_fs_connector() {
186 plan = Self::create_list_plan(core.clone(), true)?;
187 plan = StreamFsFetch::new(plan, core).into();
188 } else if core.is_iceberg_connector() || core.is_batch_connector() {
189 plan = Self::create_list_plan(core.clone(), false)?;
190 plan = StreamFsFetch::new(plan, core).into();
191 } else {
192 plan = StreamSource::new(core).into()
193 }
194 Ok(plan)
195 }
196
197 fn create_list_plan(core: generic::Source, dedup: bool) -> Result<StreamPlanRef> {
199 let downstream_columns = core.column_catalog.clone();
200 let logical_source = generic::Source::file_list_node(core);
201 let mut list_plan: StreamPlanRef = StreamSource {
202 base: PlanBase::new_stream_with_core(
203 &logical_source,
204 Distribution::Single,
205 StreamKind::AppendOnly, false,
207 WatermarkColumns::new(),
208 MonotonicityMap::new(),
209 ),
210 core: logical_source,
211 downstream_columns: Some(downstream_columns),
212 }
213 .into();
214 list_plan = RequiredDist::shard_by_key(list_plan.schema().len(), &[0])
215 .streaming_enforce_if_not_satisfies(list_plan)?;
216 if dedup {
217 list_plan = StreamDedup::new(generic::Dedup {
218 input: list_plan,
219 dedup_cols: vec![0],
220 })
221 .into();
222 }
223
224 Ok(list_plan)
225 }
226
227 pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
228 self.core.catalog.clone()
229 }
230
231 pub fn clone_with_column_catalog(&self, column_catalog: Vec<ColumnCatalog>) -> Result<Self> {
232 let row_id_index = column_catalog.iter().position(|c| c.is_row_id_column());
233 let kind = self.core.kind.clone();
234 let ctx = self.core.ctx.clone();
235 let as_of = self.core.as_of.clone();
236 Self::new(
237 self.source_catalog(),
238 column_catalog,
239 row_id_index,
240 kind,
241 ctx,
242 as_of,
243 )
244 }
245
246 fn prune_col_for_iceberg_source(&self, required_cols: &[usize]) -> PlanRef {
247 assert!(self.core.is_iceberg_connector());
248 let schema_len = self.schema().len();
252 assert!(
253 schema_len >= 4,
254 "Iceberg source must have at least 4 columns (3 iceberg hidden + 1 row_id)"
255 );
256
257 assert_eq!(
258 self.core.column_catalog[schema_len - 4].name(),
259 ICEBERG_SEQUENCE_NUM_COLUMN_NAME
260 );
261 assert_eq!(
262 self.core.column_catalog[schema_len - 3].name(),
263 ICEBERG_FILE_PATH_COLUMN_NAME
264 );
265 assert_eq!(
266 self.core.column_catalog[schema_len - 2].name(),
267 ICEBERG_FILE_POS_COLUMN_NAME
268 );
269 assert_eq!(
270 self.core.column_catalog[schema_len - 1].name(),
271 ROW_ID_COLUMN_NAME
272 );
273 assert_eq!(self.output_row_id_index, Some(self.schema().len() - 1));
274
275 let iceberg_start_idx = schema_len - 4;
276 let row_id_idx = schema_len - 1;
277
278 let mut source_cols = Vec::new();
280
281 for &idx in required_cols {
283 if idx < iceberg_start_idx {
284 source_cols.push(idx);
286 }
287 }
288
289 source_cols.extend([
291 iceberg_start_idx,
292 iceberg_start_idx + 1,
293 iceberg_start_idx + 2,
294 row_id_idx,
295 ]);
296
297 let mut core = self.core.clone();
299 core.column_catalog = source_cols
300 .iter()
301 .map(|idx| core.column_catalog[*idx].clone())
302 .collect();
303 core.row_id_index = Some(source_cols.len() - 1);
305
306 let base = PlanBase::new_logical_with_core(&core);
307 let output_exprs =
308 Self::derive_output_exprs_from_generated_columns(&core.column_catalog).unwrap();
309 let (core, _) = core.exclude_generated_columns();
310
311 let pruned_source = LogicalSource {
312 base,
313 core,
314 output_exprs,
315 output_row_id_index: Some(source_cols.len() - 1),
316 };
317
318 let mut old_to_new = vec![None; self.schema().len()];
320 for (new_idx, &old_idx) in source_cols.iter().enumerate() {
321 old_to_new[old_idx] = Some(new_idx);
322 }
323
324 let new_required: Vec<_> = required_cols
326 .iter()
327 .map(|&old_idx| old_to_new[old_idx].unwrap())
328 .collect();
329
330 let mapping =
331 ColIndexMapping::with_remaining_columns(&new_required, pruned_source.schema().len());
332 LogicalProject::with_mapping(pruned_source.into(), mapping).into()
333 }
334}
335
336impl_plan_tree_node_for_leaf! { Logical, LogicalSource}
337impl Distill for LogicalSource {
338 fn distill<'a>(&self) -> XmlNode<'a> {
339 let fields = if let Some(catalog) = self.source_catalog() {
340 let src = Pretty::from(catalog.name.clone());
341 let mut fields = vec![
342 ("source", src),
343 ("columns", column_names_pretty(self.schema())),
344 ];
345 if let Some(as_of) = &self.core.as_of {
346 fields.push(("as_of", Pretty::debug(as_of)));
347 }
348 fields
349 } else {
350 vec![]
351 };
352 childless_record("LogicalSource", fields)
353 }
354}
355
356impl ColPrunable for LogicalSource {
357 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
358 if self.core.is_iceberg_connector() {
359 self.prune_col_for_iceberg_source(required_cols)
360 } else {
361 let mapping =
363 ColIndexMapping::with_remaining_columns(required_cols, self.schema().len());
364 LogicalProject::with_mapping(self.clone().into(), mapping).into()
365 }
366 }
367}
368
369impl ExprRewritable<Logical> for LogicalSource {
370 fn has_rewritable_expr(&self) -> bool {
371 self.output_exprs.is_some()
372 }
373
374 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
375 let mut output_exprs = self.output_exprs.clone();
376
377 for expr in output_exprs.iter_mut().flatten() {
378 *expr = r.rewrite_expr(expr.clone());
379 }
380
381 Self {
382 output_exprs,
383 ..self.clone()
384 }
385 .into()
386 }
387}
388
389impl ExprVisitable for LogicalSource {
390 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
391 self.output_exprs
392 .iter()
393 .flatten()
394 .for_each(|e| v.visit_expr(e));
395 }
396}
397
398impl PredicatePushdown for LogicalSource {
399 fn predicate_pushdown(
400 &self,
401 predicate: Condition,
402 _ctx: &mut PredicatePushdownContext,
403 ) -> PlanRef {
404 LogicalFilter::create(self.clone().into(), predicate)
405 }
406}
407
408impl ToBatch for LogicalSource {
409 fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
410 assert!(
411 !self.core.is_kafka_connector(),
412 "LogicalSource with a kafka property should be converted to LogicalKafkaScan"
413 );
414 assert!(
415 !self.core.is_iceberg_connector(),
416 "LogicalSource with a iceberg property should be converted to LogicalIcebergScan"
417 );
418 let mut plan = BatchSource::new(self.core.clone()).into();
419
420 if let Some(exprs) = &self.output_exprs {
421 let logical_project = generic::Project::new(exprs.clone(), plan);
422 plan = BatchProject::new(logical_project).into();
423 }
424
425 Ok(plan)
426 }
427}
428
429impl ToStream for LogicalSource {
430 fn to_stream(
431 &self,
432 _ctx: &mut ToStreamContext,
433 ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
434 let mut plan;
435
436 match self.core.kind {
437 SourceNodeKind::CreateTable | SourceNodeKind::CreateSharedSource => {
438 plan = Self::create_non_shared_source_plan(self.core.clone())?;
441 }
442 SourceNodeKind::CreateMViewOrBatch => {
443 let use_shared_source = self.source_catalog().is_some_and(|c| c.info.is_shared());
447 if use_shared_source {
448 plan = StreamSourceScan::new(self.core.clone()).into();
449 } else {
450 plan = Self::create_non_shared_source_plan(self.core.clone())?;
452 }
453
454 if let Some(exprs) = &self.output_exprs {
455 let logical_project = generic::Project::new(exprs.clone(), plan);
456 plan = StreamProject::new(logical_project).into();
457 }
458
459 if let Some(catalog) = self.source_catalog()
460 && !catalog.watermark_descs.is_empty()
461 {
462 plan = StreamWatermarkFilter::new(plan, catalog.watermark_descs.clone()).into();
463 }
464
465 if let Some(row_id_index) = self.output_row_id_index {
466 plan = StreamRowIdGen::new_with_dist(
467 plan,
468 row_id_index,
469 HashShard(vec![row_id_index]),
470 )
471 .into();
472 }
473 }
474 }
475 Ok(plan)
476 }
477
478 fn logical_rewrite_for_stream(
479 &self,
480 _ctx: &mut RewriteStreamContext,
481 ) -> Result<(PlanRef, ColIndexMapping)> {
482 Ok((
483 self.clone().into(),
484 ColIndexMapping::identity(self.schema().len()),
485 ))
486 }
487}