risingwave_frontend/optimizer/plan_node/
logical_values.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::vec;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{Field, Schema};
21use risingwave_common::types::{DataType, ScalarImpl};
22
23use super::generic::GenericPlanRef;
24use super::utils::{Distill, childless_record};
25use super::{
26    BatchValues, ColPrunable, ExprRewritable, Logical, LogicalFilter, PlanBase, PlanRef,
27    PredicatePushdown, StreamValues, ToBatch, ToStream,
28};
29use crate::error::Result;
30use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, Literal};
31use crate::optimizer::optimizer_context::OptimizerContextRef;
32use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
33use crate::optimizer::plan_node::{
34    ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
35};
36use crate::optimizer::property::FunctionalDependencySet;
37use crate::utils::{ColIndexMapping, Condition};
38
39/// `LogicalValues` builds rows according to a list of expressions
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub struct LogicalValues {
42    pub base: PlanBase<Logical>,
43    rows: Arc<[Vec<ExprImpl>]>,
44}
45
46impl LogicalValues {
47    /// Create a [`LogicalValues`] node. Used internally by optimizer.
48    pub fn new(rows: Vec<Vec<ExprImpl>>, schema: Schema, ctx: OptimizerContextRef) -> Self {
49        for exprs in &rows {
50            for (i, expr) in exprs.iter().enumerate() {
51                assert_eq!(schema.fields()[i].data_type(), expr.return_type())
52            }
53        }
54        let functional_dependency = FunctionalDependencySet::new(schema.len());
55        let base = PlanBase::new_logical(ctx, schema, None, functional_dependency);
56        Self {
57            rows: rows.into(),
58            base,
59        }
60    }
61
62    /// Used only by `LogicalValues.rewrite_logical_for_stream`, set the `_row_id` column as pk
63    fn new_with_pk(
64        rows: Vec<Vec<ExprImpl>>,
65        schema: Schema,
66        ctx: OptimizerContextRef,
67        pk_index: usize,
68    ) -> Self {
69        for exprs in &rows {
70            for (i, expr) in exprs.iter().enumerate() {
71                assert_eq!(schema.fields()[i].data_type(), expr.return_type())
72            }
73        }
74        let functional_dependency = FunctionalDependencySet::new(schema.len());
75        let base = PlanBase::new_logical(ctx, schema, Some(vec![pk_index]), functional_dependency);
76        Self {
77            rows: rows.into(),
78            base,
79        }
80    }
81
82    /// Create a [`LogicalValues`] node. Used by planner.
83    pub fn create(rows: Vec<Vec<ExprImpl>>, schema: Schema, ctx: OptimizerContextRef) -> PlanRef {
84        // No additional checks after binder.
85        Self::new(rows, schema, ctx).into()
86    }
87
88    /// Get a reference to the logical values' rows.
89    pub fn rows(&self) -> &[Vec<ExprImpl>] {
90        self.rows.as_ref()
91    }
92
93    pub(super) fn rows_pretty<'a>(&self) -> Pretty<'a> {
94        let data = self
95            .rows()
96            .iter()
97            .map(|row| {
98                let collect = row.iter().map(Pretty::debug).collect();
99                Pretty::Array(collect)
100            })
101            .collect();
102        Pretty::Array(data)
103    }
104}
105
106impl_plan_tree_node_for_leaf! { LogicalValues }
107impl Distill for LogicalValues {
108    fn distill<'a>(&self) -> XmlNode<'a> {
109        let data = self.rows_pretty();
110        let fields = vec![("rows", data), ("schema", Pretty::debug(&self.schema()))];
111        childless_record("LogicalValues", fields)
112    }
113}
114
115impl ExprRewritable for LogicalValues {
116    fn has_rewritable_expr(&self) -> bool {
117        true
118    }
119
120    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
121        let mut new = self.clone();
122        new.rows = new
123            .rows
124            .iter()
125            .map(|exprs| {
126                exprs
127                    .iter()
128                    .map(|e| r.rewrite_expr(e.clone()))
129                    .collect::<Vec<_>>()
130            })
131            .collect::<Vec<_>>()
132            .into();
133        new.base = new.base.clone_with_new_plan_id();
134        new.into()
135    }
136}
137
138impl ExprVisitable for LogicalValues {
139    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
140        self.rows.iter().flatten().for_each(|e| v.visit_expr(e));
141    }
142}
143
144impl ColPrunable for LogicalValues {
145    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
146        let rows = self
147            .rows
148            .iter()
149            .map(|row| required_cols.iter().map(|i| row[*i].clone()).collect())
150            .collect();
151        let fields = required_cols
152            .iter()
153            .map(|i| self.schema().fields[*i].clone())
154            .collect();
155        Self::new(rows, Schema { fields }, self.base.ctx().clone()).into()
156    }
157}
158
159impl PredicatePushdown for LogicalValues {
160    fn predicate_pushdown(
161        &self,
162        predicate: Condition,
163        _ctx: &mut PredicatePushdownContext,
164    ) -> PlanRef {
165        LogicalFilter::create(self.clone().into(), predicate)
166    }
167}
168
169impl ToBatch for LogicalValues {
170    fn to_batch(&self) -> Result<PlanRef> {
171        Ok(BatchValues::new(self.clone()).into())
172    }
173}
174
175impl ToStream for LogicalValues {
176    fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
177        Ok(StreamValues::new(self.clone()).into())
178    }
179
180    fn logical_rewrite_for_stream(
181        &self,
182        _ctx: &mut RewriteStreamContext,
183    ) -> Result<(PlanRef, ColIndexMapping)> {
184        let row_id_index = self.schema().len();
185        let col_index_mapping = ColIndexMapping::identity_or_none(row_id_index, row_id_index + 1);
186        let ctx = self.ctx();
187        let mut schema = self.schema().clone();
188        schema
189            .fields
190            .push(Field::with_name(DataType::Int64, "_row_id"));
191        let rows = self.rows().to_owned();
192        let row_with_id = rows
193            .into_iter()
194            .enumerate()
195            .map(|(i, mut r)| {
196                r.push(Literal::new(Some(ScalarImpl::Int64(i as i64)), DataType::Int64).into());
197                r
198            })
199            .collect_vec();
200        let logical_values = Self::new_with_pk(row_with_id, schema, ctx, row_id_index);
201        Ok((logical_values.into(), col_index_mapping))
202    }
203}
204
205#[cfg(test)]
206mod tests {
207
208    use risingwave_common::types::Datum;
209
210    use super::*;
211    use crate::optimizer::optimizer_context::OptimizerContext;
212
213    fn literal(val: i32) -> ExprImpl {
214        Literal::new(Datum::Some(val.into()), DataType::Int32).into()
215    }
216
217    /// Pruning
218    /// ```text
219    /// Values([[0, 1, 2], [3, 4, 5])
220    /// ```
221    /// with required columns [0, 2] will result in
222    /// ```text
223    /// Values([[0, 2], [3, 5])
224    /// ```
225    #[tokio::test]
226    async fn test_prune_filter() {
227        let ctx = OptimizerContext::mock().await;
228        let schema = Schema::new(vec![
229            Field::with_name(DataType::Int32, "v1"),
230            Field::with_name(DataType::Int32, "v2"),
231            Field::with_name(DataType::Int32, "v3"),
232        ]);
233        // Values([[0, 1, 2], [3, 4, 5])
234        let values: PlanRef = LogicalValues::new(
235            vec![
236                vec![literal(0), literal(1), literal(2)],
237                vec![literal(3), literal(4), literal(5)],
238            ],
239            schema,
240            ctx,
241        )
242        .into();
243
244        let required_cols = vec![0, 2];
245        let pruned = values.prune_col(
246            &required_cols,
247            &mut ColumnPruningContext::new(values.clone()),
248        );
249
250        let values = pruned.as_logical_values().unwrap();
251        let rows: &[Vec<ExprImpl>] = values.rows();
252
253        // expected output: Values([[0, 2], [3, 5])
254        assert_eq!(rows.len(), 2);
255        assert_eq!(rows[0].len(), 2);
256        assert_eq!(rows[0][0], literal(0));
257        assert_eq!(rows[0][1], literal(2));
258        assert_eq!(rows[1].len(), 2);
259        assert_eq!(rows[1][0], literal(3));
260        assert_eq!(rows[1][1], literal(5));
261    }
262}