risingwave_frontend/optimizer/plan_node/
logical_source.rs1use std::rc::Rc;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_common::bail;
19use risingwave_common::catalog::ColumnCatalog;
20use risingwave_pb::plan_common::GeneratedColumnDesc;
21use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
22use risingwave_sqlparser::ast::AsOf;
23
24use super::generic::{GenericPlanRef, SourceNodeKind};
25use super::stream_watermark_filter::StreamWatermarkFilter;
26use super::utils::{Distill, childless_record};
27use super::{
28 BatchProject, BatchSource, ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalProject,
29 PlanBase, PlanRef, PredicatePushdown, StreamProject, StreamRowIdGen, StreamSource,
30 StreamSourceScan, ToBatch, ToStream, generic,
31};
32use crate::catalog::source_catalog::SourceCatalog;
33use crate::error::Result;
34use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor, InputRef};
35use crate::optimizer::optimizer_context::OptimizerContextRef;
36use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
37use crate::optimizer::plan_node::stream_fs_fetch::StreamFsFetch;
38use crate::optimizer::plan_node::utils::column_names_pretty;
39use crate::optimizer::plan_node::{
40 ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, StreamDedup,
41 ToStreamContext,
42};
43use crate::optimizer::property::Distribution::HashShard;
44use crate::optimizer::property::{
45 Distribution, MonotonicityMap, Order, RequiredDist, WatermarkColumns,
46};
47use crate::utils::{ColIndexMapping, Condition, IndexRewriter};
48
49#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub struct LogicalSource {
52 pub base: PlanBase<Logical>,
53 pub core: generic::Source,
54
55 pub(crate) output_exprs: Option<Vec<ExprImpl>>,
58 pub(crate) output_row_id_index: Option<usize>,
61}
62
63impl LogicalSource {
64 pub fn new(
65 source_catalog: Option<Rc<SourceCatalog>>,
66 column_catalog: Vec<ColumnCatalog>,
67 row_id_index: Option<usize>,
68 kind: SourceNodeKind,
69 ctx: OptimizerContextRef,
70 as_of: Option<AsOf>,
71 ) -> Result<Self> {
72 let core = generic::Source {
79 catalog: source_catalog,
80 column_catalog,
81 row_id_index,
82 kind,
83 ctx,
84 as_of,
85 };
86
87 if core.as_of.is_some() && !core.support_time_travel() {
88 bail!("Time travel is not supported for the source")
89 }
90
91 let base = PlanBase::new_logical_with_core(&core);
92
93 let output_exprs = Self::derive_output_exprs_from_generated_columns(&core.column_catalog)?;
94 let (core, output_row_id_index) = core.exclude_generated_columns();
95
96 Ok(LogicalSource {
97 base,
98 core,
99 output_exprs,
100 output_row_id_index,
101 })
102 }
103
104 pub fn with_catalog(
105 source_catalog: Rc<SourceCatalog>,
106 kind: SourceNodeKind,
107 ctx: OptimizerContextRef,
108 as_of: Option<AsOf>,
109 ) -> Result<Self> {
110 let column_catalogs = source_catalog.columns.clone();
111 let row_id_index = source_catalog.row_id_index;
112 if !source_catalog.append_only {
113 assert!(row_id_index.is_none());
114 }
115
116 Self::new(
117 Some(source_catalog),
118 column_catalogs,
119 row_id_index,
120 kind,
121 ctx,
122 as_of,
123 )
124 }
125
126 pub fn derive_output_exprs_from_generated_columns(
131 columns: &[ColumnCatalog],
132 ) -> Result<Option<Vec<ExprImpl>>> {
133 if !columns.iter().any(|c| c.is_generated()) {
134 return Ok(None);
135 }
136
137 let col_mapping = {
138 let mut mapping = vec![None; columns.len()];
139 let mut cur = 0;
140 for (idx, column) in columns.iter().enumerate() {
141 if !column.is_generated() {
142 mapping[idx] = Some(cur);
143 cur += 1;
144 } else {
145 mapping[idx] = None;
146 }
147 }
148 ColIndexMapping::new(mapping, columns.len())
149 };
150
151 let mut rewriter = IndexRewriter::new(col_mapping);
152 let mut exprs = Vec::with_capacity(columns.len());
153 let mut cur = 0;
154 for column in columns {
155 let column_desc = &column.column_desc;
156 let ret_data_type = column_desc.data_type.clone();
157
158 if let Some(GeneratedOrDefaultColumn::GeneratedColumn(generated_column)) =
159 &column_desc.generated_or_default_column
160 {
161 let GeneratedColumnDesc { expr } = generated_column;
162 let proj_expr =
164 rewriter.rewrite_expr(ExprImpl::from_expr_proto(expr.as_ref().unwrap())?);
165 let casted_expr = proj_expr.cast_assign(ret_data_type)?;
166 exprs.push(casted_expr);
167 } else {
168 let input_ref = InputRef {
169 data_type: ret_data_type,
170 index: cur,
171 };
172 cur += 1;
173 exprs.push(ExprImpl::InputRef(Box::new(input_ref)));
174 }
175 }
176
177 Ok(Some(exprs))
178 }
179
180 fn create_non_shared_source_plan(core: generic::Source) -> Result<PlanRef> {
181 let mut plan: PlanRef;
182 if core.is_new_fs_connector() {
183 plan = Self::create_list_plan(core.clone(), true)?;
184 plan = StreamFsFetch::new(plan, core.clone()).into();
185 } else if core.is_iceberg_connector() {
186 plan = Self::create_list_plan(core.clone(), false)?;
187 plan = StreamFsFetch::new(plan, core.clone()).into();
188 } else {
189 plan = StreamSource::new(core.clone()).into()
190 }
191 Ok(plan)
192 }
193
194 fn create_list_plan(core: generic::Source, dedup: bool) -> Result<PlanRef> {
196 let logical_source = generic::Source::file_list_node(core);
197 let mut list_plan: PlanRef = StreamSource {
198 base: PlanBase::new_stream_with_core(
199 &logical_source,
200 Distribution::Single,
201 true, false,
203 WatermarkColumns::new(),
204 MonotonicityMap::new(),
205 ),
206 core: logical_source,
207 }
208 .into();
209 list_plan = RequiredDist::shard_by_key(list_plan.schema().len(), &[0])
210 .enforce_if_not_satisfies(list_plan, &Order::any())?;
211 if dedup {
212 list_plan = StreamDedup::new(generic::Dedup {
213 input: list_plan,
214 dedup_cols: vec![0],
215 })
216 .into();
217 }
218
219 Ok(list_plan)
220 }
221
222 pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
223 self.core.catalog.clone()
224 }
225
226 pub fn clone_with_column_catalog(&self, column_catalog: Vec<ColumnCatalog>) -> Result<Self> {
227 let row_id_index = column_catalog.iter().position(|c| c.is_row_id_column());
228 let kind = self.core.kind.clone();
229 let ctx = self.core.ctx.clone();
230 let as_of = self.core.as_of.clone();
231 Self::new(
232 self.source_catalog(),
233 column_catalog,
234 row_id_index,
235 kind,
236 ctx,
237 as_of,
238 )
239 }
240}
241
242impl_plan_tree_node_for_leaf! {LogicalSource}
243impl Distill for LogicalSource {
244 fn distill<'a>(&self) -> XmlNode<'a> {
245 let fields = if let Some(catalog) = self.source_catalog() {
246 let src = Pretty::from(catalog.name.clone());
247 let mut fields = vec![
248 ("source", src),
249 ("columns", column_names_pretty(self.schema())),
250 ];
251 if let Some(as_of) = &self.core.as_of {
252 fields.push(("as_of", Pretty::debug(as_of)));
253 }
254 fields
255 } else {
256 vec![]
257 };
258 childless_record("LogicalSource", fields)
259 }
260}
261
262impl ColPrunable for LogicalSource {
263 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
264 let mapping = ColIndexMapping::with_remaining_columns(required_cols, self.schema().len());
266 LogicalProject::with_mapping(self.clone().into(), mapping).into()
267 }
268}
269
270impl ExprRewritable for LogicalSource {
271 fn has_rewritable_expr(&self) -> bool {
272 self.output_exprs.is_some()
273 }
274
275 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
276 let mut output_exprs = self.output_exprs.clone();
277
278 for expr in output_exprs.iter_mut().flatten() {
279 *expr = r.rewrite_expr(expr.clone());
280 }
281
282 Self {
283 output_exprs,
284 ..self.clone()
285 }
286 .into()
287 }
288}
289
290impl ExprVisitable for LogicalSource {
291 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
292 self.output_exprs
293 .iter()
294 .flatten()
295 .for_each(|e| v.visit_expr(e));
296 }
297}
298
299impl PredicatePushdown for LogicalSource {
300 fn predicate_pushdown(
301 &self,
302 predicate: Condition,
303 _ctx: &mut PredicatePushdownContext,
304 ) -> PlanRef {
305 LogicalFilter::create(self.clone().into(), predicate)
306 }
307}
308
309impl ToBatch for LogicalSource {
310 fn to_batch(&self) -> Result<PlanRef> {
311 assert!(
312 !self.core.is_kafka_connector(),
313 "LogicalSource with a kafka property should be converted to LogicalKafkaScan"
314 );
315 assert!(
316 !self.core.is_iceberg_connector(),
317 "LogicalSource with a iceberg property should be converted to LogicalIcebergScan"
318 );
319 let mut plan: PlanRef = BatchSource::new(self.core.clone()).into();
320
321 if let Some(exprs) = &self.output_exprs {
322 let logical_project = generic::Project::new(exprs.to_vec(), plan);
323 plan = BatchProject::new(logical_project).into();
324 }
325
326 Ok(plan)
327 }
328}
329
330impl ToStream for LogicalSource {
331 fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
332 let mut plan: PlanRef;
333
334 match self.core.kind {
335 SourceNodeKind::CreateTable | SourceNodeKind::CreateSharedSource => {
336 plan = Self::create_non_shared_source_plan(self.core.clone())?;
339 }
340 SourceNodeKind::CreateMViewOrBatch => {
341 let use_shared_source = self.source_catalog().is_some_and(|c| c.info.is_shared());
345 if use_shared_source {
346 plan = StreamSourceScan::new(self.core.clone()).into();
347 } else {
348 plan = Self::create_non_shared_source_plan(self.core.clone())?;
350 }
351
352 if let Some(exprs) = &self.output_exprs {
353 let logical_project = generic::Project::new(exprs.to_vec(), plan);
354 plan = StreamProject::new(logical_project).into();
355 }
356
357 if let Some(catalog) = self.source_catalog()
358 && !catalog.watermark_descs.is_empty()
359 {
360 plan = StreamWatermarkFilter::new(plan, catalog.watermark_descs.clone()).into();
361 }
362
363 if let Some(row_id_index) = self.output_row_id_index {
364 plan = StreamRowIdGen::new_with_dist(
365 plan,
366 row_id_index,
367 HashShard(vec![row_id_index]),
368 )
369 .into();
370 }
371 }
372 }
373 Ok(plan)
374 }
375
376 fn logical_rewrite_for_stream(
377 &self,
378 _ctx: &mut RewriteStreamContext,
379 ) -> Result<(PlanRef, ColIndexMapping)> {
380 Ok((
381 self.clone().into(),
382 ColIndexMapping::identity(self.schema().len()),
383 ))
384 }
385}