risingwave_frontend/optimizer/plan_node/
logical_values.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::vec;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{Field, Schema};
21use risingwave_common::types::{DataType, ScalarImpl};
22
23use super::generic::GenericPlanRef;
24use super::utils::{Distill, childless_record};
25use super::{
26    BatchValues, ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalPlanRef as PlanRef,
27    PlanBase, PredicatePushdown, StreamValues, ToBatch, ToStream,
28};
29use crate::error::Result;
30use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, Literal};
31use crate::optimizer::optimizer_context::OptimizerContextRef;
32use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
33use crate::optimizer::plan_node::{
34    ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
35};
36use crate::optimizer::property::FunctionalDependencySet;
37use crate::utils::{ColIndexMapping, Condition};
38
39/// `LogicalValues` builds rows according to a list of expressions
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub struct LogicalValues {
42    pub base: PlanBase<Logical>,
43    rows: Arc<[Vec<ExprImpl>]>,
44}
45
46impl LogicalValues {
47    /// Create a [`LogicalValues`] node. Used internally by optimizer.
48    pub fn new(rows: Vec<Vec<ExprImpl>>, schema: Schema, ctx: OptimizerContextRef) -> Self {
49        for exprs in &rows {
50            for (i, expr) in exprs.iter().enumerate() {
51                assert_eq!(schema.fields()[i].data_type(), expr.return_type())
52            }
53        }
54        let functional_dependency = FunctionalDependencySet::new(schema.len());
55        let base = PlanBase::new_logical(ctx, schema, None, functional_dependency);
56        Self {
57            rows: rows.into(),
58            base,
59        }
60    }
61
62    /// Used only by `LogicalValues.rewrite_logical_for_stream`, set the `_row_id` column as pk
63    fn new_with_pk(
64        rows: Vec<Vec<ExprImpl>>,
65        schema: Schema,
66        ctx: OptimizerContextRef,
67        pk_index: usize,
68    ) -> Self {
69        for exprs in &rows {
70            for (i, expr) in exprs.iter().enumerate() {
71                assert_eq!(schema.fields()[i].data_type(), expr.return_type())
72            }
73        }
74        let functional_dependency = FunctionalDependencySet::new(schema.len());
75        let base = PlanBase::new_logical(ctx, schema, Some(vec![pk_index]), functional_dependency);
76        Self {
77            rows: rows.into(),
78            base,
79        }
80    }
81
82    /// Create a [`LogicalValues`] node. Used by planner.
83    pub fn create(rows: Vec<Vec<ExprImpl>>, schema: Schema, ctx: OptimizerContextRef) -> PlanRef {
84        // No additional checks after binder.
85        Self::new(rows, schema, ctx).into()
86    }
87
88    /// Create a [`LogicalValues`] node with a single empty row, as a dummy input for `Project` or `ProjectSet`.
89    pub fn create_empty_scalar(ctx: OptimizerContextRef) -> PlanRef {
90        Self::new(vec![vec![]], Schema::new(vec![]), ctx).into()
91    }
92
93    /// Check whether this is an empty scalar, typically created by [`LogicalValues::create_empty_scalar`].
94    pub fn is_empty_scalar(&self) -> bool {
95        self.schema().is_empty() && self.rows.len() == 1 && self.rows[0].is_empty()
96    }
97
98    /// Get a reference to the logical values' rows.
99    pub fn rows(&self) -> &[Vec<ExprImpl>] {
100        self.rows.as_ref()
101    }
102
103    pub(super) fn rows_pretty<'a>(&self) -> Pretty<'a> {
104        let data = self
105            .rows()
106            .iter()
107            .map(|row| {
108                let collect = row.iter().map(Pretty::debug).collect();
109                Pretty::Array(collect)
110            })
111            .collect();
112        Pretty::Array(data)
113    }
114}
115
116impl_plan_tree_node_for_leaf! { Logical, LogicalValues }
117impl Distill for LogicalValues {
118    fn distill<'a>(&self) -> XmlNode<'a> {
119        let data = self.rows_pretty();
120        let fields = vec![("rows", data), ("schema", Pretty::debug(&self.schema()))];
121        childless_record("LogicalValues", fields)
122    }
123}
124
125impl ExprRewritable<Logical> for LogicalValues {
126    fn has_rewritable_expr(&self) -> bool {
127        true
128    }
129
130    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
131        let mut new = self.clone();
132        new.rows = new
133            .rows
134            .iter()
135            .map(|exprs| {
136                exprs
137                    .iter()
138                    .map(|e| r.rewrite_expr(e.clone()))
139                    .collect::<Vec<_>>()
140            })
141            .collect::<Vec<_>>()
142            .into();
143        new.base = new.base.clone_with_new_plan_id();
144        new.into()
145    }
146}
147
148impl ExprVisitable for LogicalValues {
149    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
150        self.rows.iter().flatten().for_each(|e| v.visit_expr(e));
151    }
152}
153
154impl ColPrunable for LogicalValues {
155    fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
156        let rows = self
157            .rows
158            .iter()
159            .map(|row| required_cols.iter().map(|i| row[*i].clone()).collect())
160            .collect();
161        let fields = required_cols
162            .iter()
163            .map(|i| self.schema().fields[*i].clone())
164            .collect();
165        Self::new(rows, Schema { fields }, self.base.ctx()).into()
166    }
167}
168
169impl PredicatePushdown for LogicalValues {
170    fn predicate_pushdown(
171        &self,
172        predicate: Condition,
173        _ctx: &mut PredicatePushdownContext,
174    ) -> PlanRef {
175        LogicalFilter::create(self.clone().into(), predicate)
176    }
177}
178
179impl ToBatch for LogicalValues {
180    fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
181        Ok(BatchValues::new(self.clone()).into())
182    }
183}
184
185impl ToStream for LogicalValues {
186    fn to_stream(
187        &self,
188        _ctx: &mut ToStreamContext,
189    ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
190        Ok(StreamValues::new(self.clone()).into())
191    }
192
193    fn logical_rewrite_for_stream(
194        &self,
195        _ctx: &mut RewriteStreamContext,
196    ) -> Result<(PlanRef, ColIndexMapping)> {
197        let row_id_index = self.schema().len();
198        let col_index_mapping = ColIndexMapping::identity_or_none(row_id_index, row_id_index + 1);
199        let ctx = self.ctx();
200        let mut schema = self.schema().clone();
201        schema
202            .fields
203            .push(Field::with_name(DataType::Int64, "_row_id"));
204        let rows = self.rows().to_owned();
205        let row_with_id = rows
206            .into_iter()
207            .enumerate()
208            .map(|(i, mut r)| {
209                r.push(Literal::new(Some(ScalarImpl::Int64(i as i64)), DataType::Int64).into());
210                r
211            })
212            .collect_vec();
213        let logical_values = Self::new_with_pk(row_with_id, schema, ctx, row_id_index);
214        Ok((logical_values.into(), col_index_mapping))
215    }
216}
217
218#[cfg(test)]
219mod tests {
220
221    use risingwave_common::types::Datum;
222
223    use super::*;
224    use crate::optimizer::optimizer_context::OptimizerContext;
225
226    fn literal(val: i32) -> ExprImpl {
227        Literal::new(Datum::Some(val.into()), DataType::Int32).into()
228    }
229
230    /// Pruning
231    /// ```text
232    /// Values([[0, 1, 2], [3, 4, 5])
233    /// ```
234    /// with required columns [0, 2] will result in
235    /// ```text
236    /// Values([[0, 2], [3, 5])
237    /// ```
238    #[tokio::test]
239    async fn test_prune_filter() {
240        let ctx = OptimizerContext::mock().await;
241        let schema = Schema::new(vec![
242            Field::with_name(DataType::Int32, "v1"),
243            Field::with_name(DataType::Int32, "v2"),
244            Field::with_name(DataType::Int32, "v3"),
245        ]);
246        // Values([[0, 1, 2], [3, 4, 5])
247        let values: PlanRef = LogicalValues::new(
248            vec![
249                vec![literal(0), literal(1), literal(2)],
250                vec![literal(3), literal(4), literal(5)],
251            ],
252            schema,
253            ctx,
254        )
255        .into();
256
257        let required_cols = vec![0, 2];
258        let pruned = values.prune_col(
259            &required_cols,
260            &mut ColumnPruningContext::new(values.clone()),
261        );
262
263        let values = pruned.as_logical_values().unwrap();
264        let rows: &[Vec<ExprImpl>] = values.rows();
265
266        // expected output: Values([[0, 2], [3, 5])
267        assert_eq!(rows.len(), 2);
268        assert_eq!(rows[0].len(), 2);
269        assert_eq!(rows[0][0], literal(0));
270        assert_eq!(rows[0][1], literal(2));
271        assert_eq!(rows[1].len(), 2);
272        assert_eq!(rows[1][0], literal(3));
273        assert_eq!(rows[1][1], literal(5));
274    }
275}