risingwave_frontend/optimizer/plan_node/
logical_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_common::bail;
19use risingwave_common::catalog::ColumnCatalog;
20use risingwave_pb::plan_common::GeneratedColumnDesc;
21use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
22use risingwave_sqlparser::ast::AsOf;
23
24use super::generic::{GenericPlanRef, SourceNodeKind};
25use super::stream_watermark_filter::StreamWatermarkFilter;
26use super::utils::{Distill, childless_record};
27use super::{
28    BatchProject, BatchSource, ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalProject,
29    PlanBase, PlanRef, PredicatePushdown, StreamProject, StreamRowIdGen, StreamSource,
30    StreamSourceScan, ToBatch, ToStream, generic,
31};
32use crate::catalog::source_catalog::SourceCatalog;
33use crate::error::Result;
34use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor, InputRef};
35use crate::optimizer::optimizer_context::OptimizerContextRef;
36use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
37use crate::optimizer::plan_node::stream_fs_fetch::StreamFsFetch;
38use crate::optimizer::plan_node::utils::column_names_pretty;
39use crate::optimizer::plan_node::{
40    ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, StreamDedup,
41    ToStreamContext,
42};
43use crate::optimizer::property::Distribution::HashShard;
44use crate::optimizer::property::{
45    Distribution, MonotonicityMap, Order, RequiredDist, WatermarkColumns,
46};
47use crate::utils::{ColIndexMapping, Condition, IndexRewriter};
48
49/// `LogicalSource` returns contents of a table or other equivalent object
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub struct LogicalSource {
52    pub base: PlanBase<Logical>,
53    pub core: generic::Source,
54
55    /// Expressions to output. This field presents and will be turned to a `Project` when
56    /// converting to a physical plan, only if there are generated columns.
57    pub(crate) output_exprs: Option<Vec<ExprImpl>>,
58    /// When there are generated columns, the `StreamRowIdGen`'s `row_id_index` is different from
59    /// the one in `core`. So we store the one in `output_exprs` here.
60    pub(crate) output_row_id_index: Option<usize>,
61}
62
63impl LogicalSource {
64    pub fn new(
65        source_catalog: Option<Rc<SourceCatalog>>,
66        column_catalog: Vec<ColumnCatalog>,
67        row_id_index: Option<usize>,
68        kind: SourceNodeKind,
69        ctx: OptimizerContextRef,
70        as_of: Option<AsOf>,
71    ) -> Result<Self> {
72        // XXX: should we reorder the columns?
73        // The order may be strange if the schema is changed, e.g., [foo:Varchar, _rw_kafka_timestamp:Timestamptz, _row_id:Serial, bar:Int32]
74        // related: https://github.com/risingwavelabs/risingwave/issues/16486
75        // The order does not matter much. The columns field is essentially a map indexed by the column id.
76        // It will affect what users will see in `SELECT *`.
77        // But not sure if we rely on the position of hidden column like `_row_id` somewhere. For `projected_row_id` we do so...
78        let core = generic::Source {
79            catalog: source_catalog,
80            column_catalog,
81            row_id_index,
82            kind,
83            ctx,
84            as_of,
85        };
86
87        if core.as_of.is_some() && !core.support_time_travel() {
88            bail!("Time travel is not supported for the source")
89        }
90
91        let base = PlanBase::new_logical_with_core(&core);
92
93        let output_exprs = Self::derive_output_exprs_from_generated_columns(&core.column_catalog)?;
94        let (core, output_row_id_index) = core.exclude_generated_columns();
95
96        Ok(LogicalSource {
97            base,
98            core,
99            output_exprs,
100            output_row_id_index,
101        })
102    }
103
104    pub fn with_catalog(
105        source_catalog: Rc<SourceCatalog>,
106        kind: SourceNodeKind,
107        ctx: OptimizerContextRef,
108        as_of: Option<AsOf>,
109    ) -> Result<Self> {
110        let column_catalogs = source_catalog.columns.clone();
111        let row_id_index = source_catalog.row_id_index;
112        if !source_catalog.append_only {
113            assert!(row_id_index.is_none());
114        }
115
116        Self::new(
117            Some(source_catalog),
118            column_catalogs,
119            row_id_index,
120            kind,
121            ctx,
122            as_of,
123        )
124    }
125
126    /// If there are no generated columns, returns `None`.
127    ///
128    /// Otherwise, the returned expressions correspond to all columns.
129    /// Non-generated columns are represented by `InputRef`.
130    pub fn derive_output_exprs_from_generated_columns(
131        columns: &[ColumnCatalog],
132    ) -> Result<Option<Vec<ExprImpl>>> {
133        if !columns.iter().any(|c| c.is_generated()) {
134            return Ok(None);
135        }
136
137        let col_mapping = {
138            let mut mapping = vec![None; columns.len()];
139            let mut cur = 0;
140            for (idx, column) in columns.iter().enumerate() {
141                if !column.is_generated() {
142                    mapping[idx] = Some(cur);
143                    cur += 1;
144                } else {
145                    mapping[idx] = None;
146                }
147            }
148            ColIndexMapping::new(mapping, columns.len())
149        };
150
151        let mut rewriter = IndexRewriter::new(col_mapping);
152        let mut exprs = Vec::with_capacity(columns.len());
153        let mut cur = 0;
154        for column in columns {
155            let column_desc = &column.column_desc;
156            let ret_data_type = column_desc.data_type.clone();
157
158            if let Some(GeneratedOrDefaultColumn::GeneratedColumn(generated_column)) =
159                &column_desc.generated_or_default_column
160            {
161                let GeneratedColumnDesc { expr } = generated_column;
162                // TODO(yuhao): avoid this `from_expr_proto`.
163                let proj_expr =
164                    rewriter.rewrite_expr(ExprImpl::from_expr_proto(expr.as_ref().unwrap())?);
165                let casted_expr = proj_expr.cast_assign(ret_data_type)?;
166                exprs.push(casted_expr);
167            } else {
168                let input_ref = InputRef {
169                    data_type: ret_data_type,
170                    index: cur,
171                };
172                cur += 1;
173                exprs.push(ExprImpl::InputRef(Box::new(input_ref)));
174            }
175        }
176
177        Ok(Some(exprs))
178    }
179
180    fn create_non_shared_source_plan(core: generic::Source) -> Result<PlanRef> {
181        let mut plan: PlanRef;
182        if core.is_new_fs_connector() {
183            plan = Self::create_list_plan(core.clone(), true)?;
184            plan = StreamFsFetch::new(plan, core.clone()).into();
185        } else if core.is_iceberg_connector() {
186            plan = Self::create_list_plan(core.clone(), false)?;
187            plan = StreamFsFetch::new(plan, core.clone()).into();
188        } else {
189            plan = StreamSource::new(core.clone()).into()
190        }
191        Ok(plan)
192    }
193
194    /// `StreamSource` (list) -> shuffle -> (optional) `StreamDedup`
195    fn create_list_plan(core: generic::Source, dedup: bool) -> Result<PlanRef> {
196        let logical_source = generic::Source::file_list_node(core);
197        let mut list_plan: PlanRef = StreamSource {
198            base: PlanBase::new_stream_with_core(
199                &logical_source,
200                Distribution::Single,
201                true, // `list` will keep listing all objects, it must be append-only
202                false,
203                WatermarkColumns::new(),
204                MonotonicityMap::new(),
205            ),
206            core: logical_source,
207        }
208        .into();
209        list_plan = RequiredDist::shard_by_key(list_plan.schema().len(), &[0])
210            .enforce_if_not_satisfies(list_plan, &Order::any())?;
211        if dedup {
212            list_plan = StreamDedup::new(generic::Dedup {
213                input: list_plan,
214                dedup_cols: vec![0],
215            })
216            .into();
217        }
218
219        Ok(list_plan)
220    }
221
222    pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
223        self.core.catalog.clone()
224    }
225
226    pub fn clone_with_column_catalog(&self, column_catalog: Vec<ColumnCatalog>) -> Result<Self> {
227        let row_id_index = column_catalog.iter().position(|c| c.is_row_id_column());
228        let kind = self.core.kind.clone();
229        let ctx = self.core.ctx.clone();
230        let as_of = self.core.as_of.clone();
231        Self::new(
232            self.source_catalog(),
233            column_catalog,
234            row_id_index,
235            kind,
236            ctx,
237            as_of,
238        )
239    }
240}
241
242impl_plan_tree_node_for_leaf! {LogicalSource}
243impl Distill for LogicalSource {
244    fn distill<'a>(&self) -> XmlNode<'a> {
245        let fields = if let Some(catalog) = self.source_catalog() {
246            let src = Pretty::from(catalog.name.clone());
247            let mut fields = vec![
248                ("source", src),
249                ("columns", column_names_pretty(self.schema())),
250            ];
251            if let Some(as_of) = &self.core.as_of {
252                fields.push(("as_of", Pretty::debug(as_of)));
253            }
254            fields
255        } else {
256            vec![]
257        };
258        childless_record("LogicalSource", fields)
259    }
260}
261
262impl ColPrunable for LogicalSource {
263    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
264        // TODO: iceberg source can prune columns
265        let mapping = ColIndexMapping::with_remaining_columns(required_cols, self.schema().len());
266        LogicalProject::with_mapping(self.clone().into(), mapping).into()
267    }
268}
269
270impl ExprRewritable for LogicalSource {
271    fn has_rewritable_expr(&self) -> bool {
272        self.output_exprs.is_some()
273    }
274
275    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
276        let mut output_exprs = self.output_exprs.clone();
277
278        for expr in output_exprs.iter_mut().flatten() {
279            *expr = r.rewrite_expr(expr.clone());
280        }
281
282        Self {
283            output_exprs,
284            ..self.clone()
285        }
286        .into()
287    }
288}
289
290impl ExprVisitable for LogicalSource {
291    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
292        self.output_exprs
293            .iter()
294            .flatten()
295            .for_each(|e| v.visit_expr(e));
296    }
297}
298
299impl PredicatePushdown for LogicalSource {
300    fn predicate_pushdown(
301        &self,
302        predicate: Condition,
303        _ctx: &mut PredicatePushdownContext,
304    ) -> PlanRef {
305        LogicalFilter::create(self.clone().into(), predicate)
306    }
307}
308
309impl ToBatch for LogicalSource {
310    fn to_batch(&self) -> Result<PlanRef> {
311        assert!(
312            !self.core.is_kafka_connector(),
313            "LogicalSource with a kafka property should be converted to LogicalKafkaScan"
314        );
315        assert!(
316            !self.core.is_iceberg_connector(),
317            "LogicalSource with a iceberg property should be converted to LogicalIcebergScan"
318        );
319        let mut plan: PlanRef = BatchSource::new(self.core.clone()).into();
320
321        if let Some(exprs) = &self.output_exprs {
322            let logical_project = generic::Project::new(exprs.to_vec(), plan);
323            plan = BatchProject::new(logical_project).into();
324        }
325
326        Ok(plan)
327    }
328}
329
330impl ToStream for LogicalSource {
331    fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
332        let mut plan: PlanRef;
333
334        match self.core.kind {
335            SourceNodeKind::CreateTable | SourceNodeKind::CreateSharedSource => {
336                // Note: for create table, row_id and generated columns is created in plan_root.gen_table_plan.
337                // for shared source, row_id and generated columns is created after SourceBackfill node.
338                plan = Self::create_non_shared_source_plan(self.core.clone())?;
339            }
340            SourceNodeKind::CreateMViewOrBatch => {
341                // Create MV on source.
342                // We only check streaming_use_shared_source is true when `CREATE SOURCE`.
343                // The value does not affect the behavior of `CREATE MATERIALIZED VIEW` here.
344                let use_shared_source = self.source_catalog().is_some_and(|c| c.info.is_shared());
345                if use_shared_source {
346                    plan = StreamSourceScan::new(self.core.clone()).into();
347                } else {
348                    // non-shared source
349                    plan = Self::create_non_shared_source_plan(self.core.clone())?;
350                }
351
352                if let Some(exprs) = &self.output_exprs {
353                    let logical_project = generic::Project::new(exprs.to_vec(), plan);
354                    plan = StreamProject::new(logical_project).into();
355                }
356
357                if let Some(catalog) = self.source_catalog()
358                    && !catalog.watermark_descs.is_empty()
359                {
360                    plan = StreamWatermarkFilter::new(plan, catalog.watermark_descs.clone()).into();
361                }
362
363                if let Some(row_id_index) = self.output_row_id_index {
364                    plan = StreamRowIdGen::new_with_dist(
365                        plan,
366                        row_id_index,
367                        HashShard(vec![row_id_index]),
368                    )
369                    .into();
370                }
371            }
372        }
373        Ok(plan)
374    }
375
376    fn logical_rewrite_for_stream(
377        &self,
378        _ctx: &mut RewriteStreamContext,
379    ) -> Result<(PlanRef, ColIndexMapping)> {
380        Ok((
381            self.clone().into(),
382            ColIndexMapping::identity(self.schema().len()),
383        ))
384    }
385}