risingwave_frontend/optimizer/plan_node/
logical_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_common::bail;
19use risingwave_common::catalog::{
20    ColumnCatalog, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME,
21    ICEBERG_SEQUENCE_NUM_COLUMN_NAME, ROW_ID_COLUMN_NAME,
22};
23use risingwave_pb::plan_common::GeneratedColumnDesc;
24use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
25use risingwave_pb::plan_common::source_refresh_mode::RefreshMode;
26use risingwave_sqlparser::ast::AsOf;
27
28use super::generic::{GenericPlanRef, SourceNodeKind};
29use super::stream_watermark_filter::StreamWatermarkFilter;
30use super::utils::{Distill, childless_record};
31use super::{
32    BatchProject, BatchSource, ColPrunable, ExprRewritable, Logical, LogicalFilter,
33    LogicalPlanRef as PlanRef, LogicalProject, PlanBase, PredicatePushdown, StreamPlanRef,
34    StreamProject, StreamRowIdGen, StreamSource, StreamSourceScan, ToBatch, ToStream, generic,
35};
36use crate::catalog::source_catalog::SourceCatalog;
37use crate::error::Result;
38use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor, InputRef};
39use crate::optimizer::optimizer_context::OptimizerContextRef;
40use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
41use crate::optimizer::plan_node::stream_fs_fetch::StreamFsFetch;
42use crate::optimizer::plan_node::utils::column_names_pretty;
43use crate::optimizer::plan_node::{
44    ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, StreamDedup,
45    ToStreamContext,
46};
47use crate::optimizer::property::Distribution::HashShard;
48use crate::optimizer::property::{
49    Distribution, MonotonicityMap, RequiredDist, StreamKind, WatermarkColumns,
50};
51use crate::utils::{ColIndexMapping, Condition, IndexRewriter};
52
53/// `LogicalSource` returns contents of a table or other equivalent object
54#[derive(Debug, Clone, PartialEq, Eq, Hash)]
55pub struct LogicalSource {
56    pub base: PlanBase<Logical>,
57    pub core: generic::Source,
58
59    /// Expressions to output. This field presents and will be turned to a `Project` when
60    /// converting to a physical plan, only if there are generated columns.
61    pub(crate) output_exprs: Option<Vec<ExprImpl>>,
62    /// When there are generated columns, the `StreamRowIdGen`'s `row_id_index` is different from
63    /// the one in `core`. So we store the one in `output_exprs` here.
64    pub(crate) output_row_id_index: Option<usize>,
65}
66
67impl LogicalSource {
68    pub fn new(
69        source_catalog: Option<Rc<SourceCatalog>>,
70        column_catalog: Vec<ColumnCatalog>,
71        row_id_index: Option<usize>,
72        kind: SourceNodeKind,
73        ctx: OptimizerContextRef,
74        as_of: Option<AsOf>,
75    ) -> Result<Self> {
76        // XXX: should we reorder the columns?
77        // The order may be strange if the schema is changed, e.g., [foo:Varchar, _rw_kafka_timestamp:Timestamptz, _row_id:Serial, bar:Int32]
78        // related: https://github.com/risingwavelabs/risingwave/issues/16486
79        // The order does not matter much. The columns field is essentially a map indexed by the column id.
80        // It will affect what users will see in `SELECT *`.
81        // But not sure if we rely on the position of hidden column like `_row_id` somewhere. For `projected_row_id` we do so...
82        let core = generic::Source {
83            catalog: source_catalog,
84            column_catalog,
85            row_id_index,
86            kind,
87            ctx,
88            as_of,
89        };
90
91        if core.as_of.is_some() && !core.support_time_travel() {
92            bail!("Time travel is not supported for the source")
93        }
94
95        let base = PlanBase::new_logical_with_core(&core);
96
97        let output_exprs = Self::derive_output_exprs_from_generated_columns(&core.column_catalog)?;
98        let (core, output_row_id_index) = core.exclude_generated_columns();
99
100        Ok(LogicalSource {
101            base,
102            core,
103            output_exprs,
104            output_row_id_index,
105        })
106    }
107
108    pub fn with_catalog(
109        source_catalog: Rc<SourceCatalog>,
110        kind: SourceNodeKind,
111        ctx: OptimizerContextRef,
112        as_of: Option<AsOf>,
113    ) -> Result<Self> {
114        let column_catalogs = source_catalog.columns.clone();
115        let row_id_index = source_catalog.row_id_index;
116        if !source_catalog.append_only {
117            assert!(row_id_index.is_none());
118        }
119
120        Self::new(
121            Some(source_catalog),
122            column_catalogs,
123            row_id_index,
124            kind,
125            ctx,
126            as_of,
127        )
128    }
129
130    /// If there are no generated columns, returns `None`.
131    ///
132    /// Otherwise, the returned expressions correspond to all columns.
133    /// Non-generated columns are represented by `InputRef`.
134    pub fn derive_output_exprs_from_generated_columns(
135        columns: &[ColumnCatalog],
136    ) -> Result<Option<Vec<ExprImpl>>> {
137        if !columns.iter().any(|c| c.is_generated()) {
138            return Ok(None);
139        }
140
141        let col_mapping = {
142            let mut mapping = vec![None; columns.len()];
143            let mut cur = 0;
144            for (idx, column) in columns.iter().enumerate() {
145                if !column.is_generated() {
146                    mapping[idx] = Some(cur);
147                    cur += 1;
148                } else {
149                    mapping[idx] = None;
150                }
151            }
152            ColIndexMapping::new(mapping, columns.len())
153        };
154
155        let mut rewriter = IndexRewriter::new(col_mapping);
156        let mut exprs = Vec::with_capacity(columns.len());
157        let mut cur = 0;
158        for column in columns {
159            let column_desc = &column.column_desc;
160            let ret_data_type = column_desc.data_type.clone();
161
162            if let Some(GeneratedOrDefaultColumn::GeneratedColumn(generated_column)) =
163                &column_desc.generated_or_default_column
164            {
165                let GeneratedColumnDesc { expr } = generated_column;
166                // TODO(yuhao): avoid this `from_expr_proto`.
167                let proj_expr =
168                    rewriter.rewrite_expr(ExprImpl::from_expr_proto(expr.as_ref().unwrap())?);
169                let casted_expr = proj_expr.cast_assign(&ret_data_type)?;
170                exprs.push(casted_expr);
171            } else {
172                let input_ref = InputRef {
173                    data_type: ret_data_type,
174                    index: cur,
175                };
176                cur += 1;
177                exprs.push(ExprImpl::InputRef(Box::new(input_ref)));
178            }
179        }
180
181        Ok(Some(exprs))
182    }
183
184    fn create_non_shared_source_plan(core: generic::Source) -> Result<StreamPlanRef> {
185        let mut plan;
186        if core.is_new_fs_connector() {
187            plan = Self::create_list_plan(core.clone(), true)?;
188            plan = StreamFsFetch::new(plan, core).into();
189        } else if core.is_iceberg_connector() || core.is_batch_connector() {
190            plan = Self::create_list_plan(core.clone(), false)?;
191            plan = StreamFsFetch::new(plan, core).into();
192        } else {
193            plan = StreamSource::new(core).into()
194        }
195        Ok(plan)
196    }
197
198    /// `StreamSource` (list) -> shuffle -> (optional) `StreamDedup`
199    fn create_list_plan(core: generic::Source, dedup: bool) -> Result<StreamPlanRef> {
200        let downstream_columns = core.column_catalog.clone();
201        let logical_source = generic::Source::file_list_node(core);
202        let mut list_plan: StreamPlanRef = StreamSource {
203            base: PlanBase::new_stream_with_core(
204                &logical_source,
205                Distribution::Single,
206                StreamKind::AppendOnly, // `list` will keep listing all objects, it must be append-only
207                false,
208                WatermarkColumns::new(),
209                MonotonicityMap::new(),
210            ),
211            core: logical_source,
212            downstream_columns: Some(downstream_columns),
213        }
214        .into();
215        list_plan = RequiredDist::shard_by_key(list_plan.schema().len(), &[0])
216            .streaming_enforce_if_not_satisfies(list_plan)?;
217        if dedup {
218            list_plan = StreamDedup::new(generic::Dedup {
219                input: list_plan,
220                dedup_cols: vec![0],
221            })
222            .into();
223        }
224
225        Ok(list_plan)
226    }
227
228    pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
229        self.core.catalog.clone()
230    }
231
232    pub fn clone_with_column_catalog(&self, column_catalog: Vec<ColumnCatalog>) -> Result<Self> {
233        let row_id_index = column_catalog.iter().position(|c| c.is_row_id_column());
234        let kind = self.core.kind.clone();
235        let ctx = self.core.ctx.clone();
236        let as_of = self.core.as_of.clone();
237        Self::new(
238            self.source_catalog(),
239            column_catalog,
240            row_id_index,
241            kind,
242            ctx,
243            as_of,
244        )
245    }
246
247    fn prune_col_for_iceberg_source(&self, required_cols: &[usize]) -> PlanRef {
248        assert!(self.core.is_iceberg_connector());
249        // Iceberg source supports column pruning at source level
250        // Schema invariant: [table columns] + [_iceberg_sequence_number, _iceberg_file_path, _iceberg_file_pos, _row_id]
251        // The last 4 columns are always: 3 iceberg hidden columns + _row_id
252
253        let schema_len = self.schema().len();
254        assert!(
255            schema_len >= 4,
256            "Iceberg source must have at least 4 columns (3 iceberg hidden + 1 row_id)"
257        );
258
259        assert_eq!(
260            self.core.column_catalog[schema_len - 4].name(),
261            ICEBERG_SEQUENCE_NUM_COLUMN_NAME
262        );
263        assert_eq!(
264            self.core.column_catalog[schema_len - 3].name(),
265            ICEBERG_FILE_PATH_COLUMN_NAME
266        );
267        assert_eq!(
268            self.core.column_catalog[schema_len - 2].name(),
269            ICEBERG_FILE_POS_COLUMN_NAME
270        );
271        assert_eq!(
272            self.core.column_catalog[schema_len - 1].name(),
273            ROW_ID_COLUMN_NAME
274        );
275        assert_eq!(self.output_row_id_index, Some(self.schema().len() - 1));
276
277        let iceberg_start_idx = schema_len - 4;
278        let row_id_idx = schema_len - 1;
279
280        // Build source_cols: table columns from required_cols + always keep last 4 columns
281        let mut source_cols = Vec::new();
282
283        // Collect table columns (before the last 4 columns) from required_cols
284        for &idx in required_cols {
285            if idx < iceberg_start_idx {
286                // Regular table column
287                source_cols.push(idx);
288            }
289        }
290
291        // Always append the last 4 columns: [_iceberg_sequence_number, _iceberg_file_path, _iceberg_file_pos, _row_id]
292        source_cols.extend([
293            iceberg_start_idx,
294            iceberg_start_idx + 1,
295            iceberg_start_idx + 2,
296            row_id_idx,
297        ]);
298
299        // Clone with pruned columns - source_cols is never empty (always has last 4 columns)
300        let mut core = self.core.clone();
301        core.column_catalog = source_cols
302            .iter()
303            .map(|idx| core.column_catalog[*idx].clone())
304            .collect();
305        // row_id is always at the last position in the pruned schema
306        core.row_id_index = Some(source_cols.len() - 1);
307
308        let base = PlanBase::new_logical_with_core(&core);
309        let output_exprs =
310            Self::derive_output_exprs_from_generated_columns(&core.column_catalog).unwrap();
311        let (core, _) = core.exclude_generated_columns();
312
313        let pruned_source = LogicalSource {
314            base,
315            core,
316            output_exprs,
317            output_row_id_index: Some(source_cols.len() - 1),
318        };
319
320        // Build mapping from original schema indices to pruned schema indices
321        let mut old_to_new = vec![None; self.schema().len()];
322        for (new_idx, &old_idx) in source_cols.iter().enumerate() {
323            old_to_new[old_idx] = Some(new_idx);
324        }
325
326        // Map required_cols to indices in the pruned schema
327        let new_required: Vec<_> = required_cols
328            .iter()
329            .map(|&old_idx| old_to_new[old_idx].unwrap())
330            .collect();
331
332        let mapping =
333            ColIndexMapping::with_remaining_columns(&new_required, pruned_source.schema().len());
334        LogicalProject::with_mapping(pruned_source.into(), mapping).into()
335    }
336}
337
338impl_plan_tree_node_for_leaf! { Logical, LogicalSource}
339impl Distill for LogicalSource {
340    fn distill<'a>(&self) -> XmlNode<'a> {
341        let fields = if let Some(catalog) = self.source_catalog() {
342            let src = Pretty::from(catalog.name.clone());
343            let mut fields = vec![
344                ("source", src),
345                ("columns", column_names_pretty(self.schema())),
346            ];
347            if let Some(as_of) = &self.core.as_of {
348                fields.push(("as_of", Pretty::debug(as_of)));
349            }
350            fields
351        } else {
352            vec![]
353        };
354        childless_record("LogicalSource", fields)
355    }
356}
357
358impl ColPrunable for LogicalSource {
359    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
360        let is_refreshable_iceberg = self.source_catalog().is_some_and(|catalog| {
361            catalog.refresh_mode.is_some_and(|refresh_mode| {
362                matches!(refresh_mode.refresh_mode, Some(RefreshMode::FullReload(_)))
363            })
364        }); // for refreshable iceberg table, we does not expose iceberg hidden columns to the user
365
366        if self.core.is_iceberg_connector() && !is_refreshable_iceberg {
367            self.prune_col_for_iceberg_source(required_cols)
368        } else {
369            // For other sources, use a LogicalProject to prune columns
370            let mapping =
371                ColIndexMapping::with_remaining_columns(required_cols, self.schema().len());
372            LogicalProject::with_mapping(self.clone().into(), mapping).into()
373        }
374    }
375}
376
377impl ExprRewritable<Logical> for LogicalSource {
378    fn has_rewritable_expr(&self) -> bool {
379        self.output_exprs.is_some()
380    }
381
382    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
383        let mut output_exprs = self.output_exprs.clone();
384
385        for expr in output_exprs.iter_mut().flatten() {
386            *expr = r.rewrite_expr(expr.clone());
387        }
388
389        Self {
390            output_exprs,
391            ..self.clone()
392        }
393        .into()
394    }
395}
396
397impl ExprVisitable for LogicalSource {
398    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
399        self.output_exprs
400            .iter()
401            .flatten()
402            .for_each(|e| v.visit_expr(e));
403    }
404}
405
406impl PredicatePushdown for LogicalSource {
407    fn predicate_pushdown(
408        &self,
409        predicate: Condition,
410        _ctx: &mut PredicatePushdownContext,
411    ) -> PlanRef {
412        LogicalFilter::create(self.clone().into(), predicate)
413    }
414}
415
416impl ToBatch for LogicalSource {
417    fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
418        assert!(
419            !self.core.is_kafka_connector(),
420            "LogicalSource with a kafka property should be converted to LogicalKafkaScan"
421        );
422        assert!(
423            !self.core.is_iceberg_connector(),
424            "LogicalSource with a iceberg property should be converted to LogicalIcebergScan"
425        );
426        let mut plan = BatchSource::new(self.core.clone()).into();
427
428        if let Some(exprs) = &self.output_exprs {
429            let logical_project = generic::Project::new(exprs.clone(), plan);
430            plan = BatchProject::new(logical_project).into();
431        }
432
433        Ok(plan)
434    }
435}
436
437impl ToStream for LogicalSource {
438    fn to_stream(
439        &self,
440        _ctx: &mut ToStreamContext,
441    ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
442        let mut plan;
443
444        match self.core.kind {
445            SourceNodeKind::CreateTable | SourceNodeKind::CreateSharedSource => {
446                // Note: for create table, row_id and generated columns is created in plan_root.gen_table_plan.
447                // for shared source, row_id and generated columns is created after SourceBackfill node.
448                plan = Self::create_non_shared_source_plan(self.core.clone())?;
449            }
450            SourceNodeKind::CreateMViewOrBatch => {
451                // Create MV on source.
452                // We only check streaming_use_shared_source is true when `CREATE SOURCE`.
453                // The value does not affect the behavior of `CREATE MATERIALIZED VIEW` here.
454                let use_shared_source = self.source_catalog().is_some_and(|c| c.info.is_shared());
455                if use_shared_source {
456                    plan = StreamSourceScan::new(self.core.clone()).into();
457                } else {
458                    // non-shared source
459                    plan = Self::create_non_shared_source_plan(self.core.clone())?;
460                }
461
462                if let Some(exprs) = &self.output_exprs {
463                    let logical_project = generic::Project::new(exprs.clone(), plan);
464                    plan = StreamProject::new(logical_project).into();
465                }
466
467                if let Some(catalog) = self.source_catalog()
468                    && !catalog.watermark_descs.is_empty()
469                {
470                    plan = StreamWatermarkFilter::new(plan, catalog.watermark_descs.clone()).into();
471                }
472
473                if let Some(row_id_index) = self.output_row_id_index {
474                    plan = StreamRowIdGen::new_with_dist(
475                        plan,
476                        row_id_index,
477                        HashShard(vec![row_id_index]),
478                    )
479                    .into();
480                }
481            }
482        }
483        Ok(plan)
484    }
485
486    fn logical_rewrite_for_stream(
487        &self,
488        _ctx: &mut RewriteStreamContext,
489    ) -> Result<(PlanRef, ColIndexMapping)> {
490        Ok((
491            self.clone().into(),
492            ColIndexMapping::identity(self.schema().len()),
493        ))
494    }
495}