risingwave_frontend/optimizer/plan_node/
logical_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::rc::Rc;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_common::bail;
19use risingwave_common::catalog::{
20    ColumnCatalog, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_FILE_POS_COLUMN_NAME,
21    ICEBERG_SEQUENCE_NUM_COLUMN_NAME, ROW_ID_COLUMN_NAME,
22};
23use risingwave_pb::plan_common::GeneratedColumnDesc;
24use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
25use risingwave_sqlparser::ast::AsOf;
26
27use super::generic::{GenericPlanRef, SourceNodeKind};
28use super::stream_watermark_filter::StreamWatermarkFilter;
29use super::utils::{Distill, childless_record};
30use super::{
31    BatchProject, BatchSource, ColPrunable, ExprRewritable, Logical, LogicalFilter,
32    LogicalPlanRef as PlanRef, LogicalProject, PlanBase, PredicatePushdown, StreamPlanRef,
33    StreamProject, StreamRowIdGen, StreamSource, StreamSourceScan, ToBatch, ToStream, generic,
34};
35use crate::catalog::source_catalog::SourceCatalog;
36use crate::error::Result;
37use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor, InputRef};
38use crate::optimizer::optimizer_context::OptimizerContextRef;
39use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
40use crate::optimizer::plan_node::stream_fs_fetch::StreamFsFetch;
41use crate::optimizer::plan_node::utils::column_names_pretty;
42use crate::optimizer::plan_node::{
43    ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, StreamDedup,
44    ToStreamContext,
45};
46use crate::optimizer::property::Distribution::HashShard;
47use crate::optimizer::property::{
48    Distribution, MonotonicityMap, RequiredDist, StreamKind, WatermarkColumns,
49};
50use crate::utils::{ColIndexMapping, Condition, IndexRewriter};
51
52/// `LogicalSource` returns contents of a table or other equivalent object
53#[derive(Debug, Clone, PartialEq, Eq, Hash)]
54pub struct LogicalSource {
55    pub base: PlanBase<Logical>,
56    pub core: generic::Source,
57
58    /// Expressions to output. This field presents and will be turned to a `Project` when
59    /// converting to a physical plan, only if there are generated columns.
60    pub(crate) output_exprs: Option<Vec<ExprImpl>>,
61    /// When there are generated columns, the `StreamRowIdGen`'s `row_id_index` is different from
62    /// the one in `core`. So we store the one in `output_exprs` here.
63    pub(crate) output_row_id_index: Option<usize>,
64}
65
66impl LogicalSource {
67    pub fn new(
68        source_catalog: Option<Rc<SourceCatalog>>,
69        column_catalog: Vec<ColumnCatalog>,
70        row_id_index: Option<usize>,
71        kind: SourceNodeKind,
72        ctx: OptimizerContextRef,
73        as_of: Option<AsOf>,
74    ) -> Result<Self> {
75        // XXX: should we reorder the columns?
76        // The order may be strange if the schema is changed, e.g., [foo:Varchar, _rw_kafka_timestamp:Timestamptz, _row_id:Serial, bar:Int32]
77        // related: https://github.com/risingwavelabs/risingwave/issues/16486
78        // The order does not matter much. The columns field is essentially a map indexed by the column id.
79        // It will affect what users will see in `SELECT *`.
80        // But not sure if we rely on the position of hidden column like `_row_id` somewhere. For `projected_row_id` we do so...
81        let core = generic::Source {
82            catalog: source_catalog,
83            column_catalog,
84            row_id_index,
85            kind,
86            ctx,
87            as_of,
88        };
89
90        if core.as_of.is_some() && !core.support_time_travel() {
91            bail!("Time travel is not supported for the source")
92        }
93
94        let base = PlanBase::new_logical_with_core(&core);
95
96        let output_exprs = Self::derive_output_exprs_from_generated_columns(&core.column_catalog)?;
97        let (core, output_row_id_index) = core.exclude_generated_columns();
98
99        Ok(LogicalSource {
100            base,
101            core,
102            output_exprs,
103            output_row_id_index,
104        })
105    }
106
107    pub fn with_catalog(
108        source_catalog: Rc<SourceCatalog>,
109        kind: SourceNodeKind,
110        ctx: OptimizerContextRef,
111        as_of: Option<AsOf>,
112    ) -> Result<Self> {
113        let column_catalogs = source_catalog.columns.clone();
114        let row_id_index = source_catalog.row_id_index;
115        if !source_catalog.append_only {
116            assert!(row_id_index.is_none());
117        }
118
119        Self::new(
120            Some(source_catalog),
121            column_catalogs,
122            row_id_index,
123            kind,
124            ctx,
125            as_of,
126        )
127    }
128
129    /// If there are no generated columns, returns `None`.
130    ///
131    /// Otherwise, the returned expressions correspond to all columns.
132    /// Non-generated columns are represented by `InputRef`.
133    pub fn derive_output_exprs_from_generated_columns(
134        columns: &[ColumnCatalog],
135    ) -> Result<Option<Vec<ExprImpl>>> {
136        if !columns.iter().any(|c| c.is_generated()) {
137            return Ok(None);
138        }
139
140        let col_mapping = {
141            let mut mapping = vec![None; columns.len()];
142            let mut cur = 0;
143            for (idx, column) in columns.iter().enumerate() {
144                if !column.is_generated() {
145                    mapping[idx] = Some(cur);
146                    cur += 1;
147                } else {
148                    mapping[idx] = None;
149                }
150            }
151            ColIndexMapping::new(mapping, columns.len())
152        };
153
154        let mut rewriter = IndexRewriter::new(col_mapping);
155        let mut exprs = Vec::with_capacity(columns.len());
156        let mut cur = 0;
157        for column in columns {
158            let column_desc = &column.column_desc;
159            let ret_data_type = column_desc.data_type.clone();
160
161            if let Some(GeneratedOrDefaultColumn::GeneratedColumn(generated_column)) =
162                &column_desc.generated_or_default_column
163            {
164                let GeneratedColumnDesc { expr } = generated_column;
165                // TODO(yuhao): avoid this `from_expr_proto`.
166                let proj_expr =
167                    rewriter.rewrite_expr(ExprImpl::from_expr_proto(expr.as_ref().unwrap())?);
168                let casted_expr = proj_expr.cast_assign(&ret_data_type)?;
169                exprs.push(casted_expr);
170            } else {
171                let input_ref = InputRef {
172                    data_type: ret_data_type,
173                    index: cur,
174                };
175                cur += 1;
176                exprs.push(ExprImpl::InputRef(Box::new(input_ref)));
177            }
178        }
179
180        Ok(Some(exprs))
181    }
182
183    fn create_non_shared_source_plan(core: generic::Source) -> Result<StreamPlanRef> {
184        let mut plan;
185        if core.is_new_fs_connector() {
186            plan = Self::create_list_plan(core.clone(), true)?;
187            plan = StreamFsFetch::new(plan, core).into();
188        } else if core.is_iceberg_connector() || core.is_batch_connector() {
189            plan = Self::create_list_plan(core.clone(), false)?;
190            plan = StreamFsFetch::new(plan, core).into();
191        } else {
192            plan = StreamSource::new(core).into()
193        }
194        Ok(plan)
195    }
196
197    /// `StreamSource` (list) -> shuffle -> (optional) `StreamDedup`
198    fn create_list_plan(core: generic::Source, dedup: bool) -> Result<StreamPlanRef> {
199        let downstream_columns = core.column_catalog.clone();
200        let logical_source = generic::Source::file_list_node(core);
201        let mut list_plan: StreamPlanRef = StreamSource {
202            base: PlanBase::new_stream_with_core(
203                &logical_source,
204                Distribution::Single,
205                StreamKind::AppendOnly, // `list` will keep listing all objects, it must be append-only
206                false,
207                WatermarkColumns::new(),
208                MonotonicityMap::new(),
209            ),
210            core: logical_source,
211            downstream_columns: Some(downstream_columns),
212        }
213        .into();
214        list_plan = RequiredDist::shard_by_key(list_plan.schema().len(), &[0])
215            .streaming_enforce_if_not_satisfies(list_plan)?;
216        if dedup {
217            list_plan = StreamDedup::new(generic::Dedup {
218                input: list_plan,
219                dedup_cols: vec![0],
220            })
221            .into();
222        }
223
224        Ok(list_plan)
225    }
226
227    pub fn source_catalog(&self) -> Option<Rc<SourceCatalog>> {
228        self.core.catalog.clone()
229    }
230
231    pub fn clone_with_column_catalog(&self, column_catalog: Vec<ColumnCatalog>) -> Result<Self> {
232        let row_id_index = column_catalog.iter().position(|c| c.is_row_id_column());
233        let kind = self.core.kind.clone();
234        let ctx = self.core.ctx.clone();
235        let as_of = self.core.as_of.clone();
236        Self::new(
237            self.source_catalog(),
238            column_catalog,
239            row_id_index,
240            kind,
241            ctx,
242            as_of,
243        )
244    }
245
246    fn prune_col_for_iceberg_source(&self, required_cols: &[usize]) -> PlanRef {
247        assert!(self.core.is_iceberg_connector());
248        // Iceberg source supports column pruning at source level
249        // Schema invariant: [table columns] + [_iceberg_sequence_number, _iceberg_file_path, _iceberg_file_pos, _row_id]
250        // The last 4 columns are always: 3 iceberg hidden columns + _row_id
251        let schema_len = self.schema().len();
252        assert!(
253            schema_len >= 4,
254            "Iceberg source must have at least 4 columns (3 iceberg hidden + 1 row_id)"
255        );
256
257        assert_eq!(
258            self.core.column_catalog[schema_len - 4].name(),
259            ICEBERG_SEQUENCE_NUM_COLUMN_NAME
260        );
261        assert_eq!(
262            self.core.column_catalog[schema_len - 3].name(),
263            ICEBERG_FILE_PATH_COLUMN_NAME
264        );
265        assert_eq!(
266            self.core.column_catalog[schema_len - 2].name(),
267            ICEBERG_FILE_POS_COLUMN_NAME
268        );
269        assert_eq!(
270            self.core.column_catalog[schema_len - 1].name(),
271            ROW_ID_COLUMN_NAME
272        );
273        assert_eq!(self.output_row_id_index, Some(self.schema().len() - 1));
274
275        let iceberg_start_idx = schema_len - 4;
276        let row_id_idx = schema_len - 1;
277
278        // Build source_cols: table columns from required_cols + always keep last 4 columns
279        let mut source_cols = Vec::new();
280
281        // Collect table columns (before the last 4 columns) from required_cols
282        for &idx in required_cols {
283            if idx < iceberg_start_idx {
284                // Regular table column
285                source_cols.push(idx);
286            }
287        }
288
289        // Always append the last 4 columns: [_iceberg_sequence_number, _iceberg_file_path, _iceberg_file_pos, _row_id]
290        source_cols.extend([
291            iceberg_start_idx,
292            iceberg_start_idx + 1,
293            iceberg_start_idx + 2,
294            row_id_idx,
295        ]);
296
297        // Clone with pruned columns - source_cols is never empty (always has last 4 columns)
298        let mut core = self.core.clone();
299        core.column_catalog = source_cols
300            .iter()
301            .map(|idx| core.column_catalog[*idx].clone())
302            .collect();
303        // row_id is always at the last position in the pruned schema
304        core.row_id_index = Some(source_cols.len() - 1);
305
306        let base = PlanBase::new_logical_with_core(&core);
307        let output_exprs =
308            Self::derive_output_exprs_from_generated_columns(&core.column_catalog).unwrap();
309        let (core, _) = core.exclude_generated_columns();
310
311        let pruned_source = LogicalSource {
312            base,
313            core,
314            output_exprs,
315            output_row_id_index: Some(source_cols.len() - 1),
316        };
317
318        // Build mapping from original schema indices to pruned schema indices
319        let mut old_to_new = vec![None; self.schema().len()];
320        for (new_idx, &old_idx) in source_cols.iter().enumerate() {
321            old_to_new[old_idx] = Some(new_idx);
322        }
323
324        // Map required_cols to indices in the pruned schema
325        let new_required: Vec<_> = required_cols
326            .iter()
327            .map(|&old_idx| old_to_new[old_idx].unwrap())
328            .collect();
329
330        let mapping =
331            ColIndexMapping::with_remaining_columns(&new_required, pruned_source.schema().len());
332        LogicalProject::with_mapping(pruned_source.into(), mapping).into()
333    }
334}
335
336impl_plan_tree_node_for_leaf! { Logical, LogicalSource}
337impl Distill for LogicalSource {
338    fn distill<'a>(&self) -> XmlNode<'a> {
339        let fields = if let Some(catalog) = self.source_catalog() {
340            let src = Pretty::from(catalog.name.clone());
341            let mut fields = vec![
342                ("source", src),
343                ("columns", column_names_pretty(self.schema())),
344            ];
345            if let Some(as_of) = &self.core.as_of {
346                fields.push(("as_of", Pretty::debug(as_of)));
347            }
348            fields
349        } else {
350            vec![]
351        };
352        childless_record("LogicalSource", fields)
353    }
354}
355
356impl ColPrunable for LogicalSource {
357    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
358        if self.core.is_iceberg_connector() {
359            self.prune_col_for_iceberg_source(required_cols)
360        } else {
361            // For other sources, use a LogicalProject to prune columns
362            let mapping =
363                ColIndexMapping::with_remaining_columns(required_cols, self.schema().len());
364            LogicalProject::with_mapping(self.clone().into(), mapping).into()
365        }
366    }
367}
368
369impl ExprRewritable<Logical> for LogicalSource {
370    fn has_rewritable_expr(&self) -> bool {
371        self.output_exprs.is_some()
372    }
373
374    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
375        let mut output_exprs = self.output_exprs.clone();
376
377        for expr in output_exprs.iter_mut().flatten() {
378            *expr = r.rewrite_expr(expr.clone());
379        }
380
381        Self {
382            output_exprs,
383            ..self.clone()
384        }
385        .into()
386    }
387}
388
389impl ExprVisitable for LogicalSource {
390    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
391        self.output_exprs
392            .iter()
393            .flatten()
394            .for_each(|e| v.visit_expr(e));
395    }
396}
397
398impl PredicatePushdown for LogicalSource {
399    fn predicate_pushdown(
400        &self,
401        predicate: Condition,
402        _ctx: &mut PredicatePushdownContext,
403    ) -> PlanRef {
404        LogicalFilter::create(self.clone().into(), predicate)
405    }
406}
407
408impl ToBatch for LogicalSource {
409    fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
410        assert!(
411            !self.core.is_kafka_connector(),
412            "LogicalSource with a kafka property should be converted to LogicalKafkaScan"
413        );
414        assert!(
415            !self.core.is_iceberg_connector(),
416            "LogicalSource with a iceberg property should be converted to LogicalIcebergScan"
417        );
418        let mut plan = BatchSource::new(self.core.clone()).into();
419
420        if let Some(exprs) = &self.output_exprs {
421            let logical_project = generic::Project::new(exprs.clone(), plan);
422            plan = BatchProject::new(logical_project).into();
423        }
424
425        Ok(plan)
426    }
427}
428
429impl ToStream for LogicalSource {
430    fn to_stream(
431        &self,
432        _ctx: &mut ToStreamContext,
433    ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
434        let mut plan;
435
436        match self.core.kind {
437            SourceNodeKind::CreateTable | SourceNodeKind::CreateSharedSource => {
438                // Note: for create table, row_id and generated columns is created in plan_root.gen_table_plan.
439                // for shared source, row_id and generated columns is created after SourceBackfill node.
440                plan = Self::create_non_shared_source_plan(self.core.clone())?;
441            }
442            SourceNodeKind::CreateMViewOrBatch => {
443                // Create MV on source.
444                // We only check streaming_use_shared_source is true when `CREATE SOURCE`.
445                // The value does not affect the behavior of `CREATE MATERIALIZED VIEW` here.
446                let use_shared_source = self.source_catalog().is_some_and(|c| c.info.is_shared());
447                if use_shared_source {
448                    plan = StreamSourceScan::new(self.core.clone()).into();
449                } else {
450                    // non-shared source
451                    plan = Self::create_non_shared_source_plan(self.core.clone())?;
452                }
453
454                if let Some(exprs) = &self.output_exprs {
455                    let logical_project = generic::Project::new(exprs.clone(), plan);
456                    plan = StreamProject::new(logical_project).into();
457                }
458
459                if let Some(catalog) = self.source_catalog()
460                    && !catalog.watermark_descs.is_empty()
461                {
462                    plan = StreamWatermarkFilter::new(plan, catalog.watermark_descs.clone()).into();
463                }
464
465                if let Some(row_id_index) = self.output_row_id_index {
466                    plan = StreamRowIdGen::new_with_dist(
467                        plan,
468                        row_id_index,
469                        HashShard(vec![row_id_index]),
470                    )
471                    .into();
472                }
473            }
474        }
475        Ok(plan)
476    }
477
478    fn logical_rewrite_for_stream(
479        &self,
480        _ctx: &mut RewriteStreamContext,
481    ) -> Result<(PlanRef, ColIndexMapping)> {
482        Ok((
483            self.clone().into(),
484            ColIndexMapping::identity(self.schema().len()),
485        ))
486    }
487}