risingwave_frontend/optimizer/plan_node/
logical_values.rs1use std::sync::Arc;
16use std::vec;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{Field, Schema};
21use risingwave_common::types::{DataType, ScalarImpl};
22
23use super::generic::GenericPlanRef;
24use super::utils::{Distill, childless_record};
25use super::{
26 BatchValues, ColPrunable, ExprRewritable, Logical, LogicalFilter, LogicalPlanRef as PlanRef,
27 PlanBase, PredicatePushdown, StreamValues, ToBatch, ToStream,
28};
29use crate::error::Result;
30use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, Literal};
31use crate::optimizer::optimizer_context::OptimizerContextRef;
32use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
33use crate::optimizer::plan_node::{
34 ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
35};
36use crate::optimizer::property::FunctionalDependencySet;
37use crate::utils::{ColIndexMapping, Condition};
38
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub struct LogicalValues {
42 pub base: PlanBase<Logical>,
43 rows: Arc<[Vec<ExprImpl>]>,
44}
45
46impl LogicalValues {
47 pub fn new(rows: Vec<Vec<ExprImpl>>, schema: Schema, ctx: OptimizerContextRef) -> Self {
49 for exprs in &rows {
50 for (i, expr) in exprs.iter().enumerate() {
51 assert_eq!(schema.fields()[i].data_type(), expr.return_type())
52 }
53 }
54 let functional_dependency = FunctionalDependencySet::new(schema.len());
55 let base = PlanBase::new_logical(ctx, schema, None, functional_dependency);
56 Self {
57 rows: rows.into(),
58 base,
59 }
60 }
61
62 fn new_with_pk(
64 rows: Vec<Vec<ExprImpl>>,
65 schema: Schema,
66 ctx: OptimizerContextRef,
67 pk_index: usize,
68 ) -> Self {
69 for exprs in &rows {
70 for (i, expr) in exprs.iter().enumerate() {
71 assert_eq!(schema.fields()[i].data_type(), expr.return_type())
72 }
73 }
74 let functional_dependency = FunctionalDependencySet::new(schema.len());
75 let base = PlanBase::new_logical(ctx, schema, Some(vec![pk_index]), functional_dependency);
76 Self {
77 rows: rows.into(),
78 base,
79 }
80 }
81
82 pub fn create(rows: Vec<Vec<ExprImpl>>, schema: Schema, ctx: OptimizerContextRef) -> PlanRef {
84 Self::new(rows, schema, ctx).into()
86 }
87
88 pub fn create_empty_scalar(ctx: OptimizerContextRef) -> PlanRef {
90 Self::new(vec![vec![]], Schema::new(vec![]), ctx).into()
91 }
92
93 pub fn is_empty_scalar(&self) -> bool {
95 self.schema().is_empty() && self.rows.len() == 1 && self.rows[0].is_empty()
96 }
97
98 pub fn rows(&self) -> &[Vec<ExprImpl>] {
100 self.rows.as_ref()
101 }
102
103 pub(super) fn rows_pretty<'a>(&self) -> Pretty<'a> {
104 let data = self
105 .rows()
106 .iter()
107 .map(|row| {
108 let collect = row.iter().map(Pretty::debug).collect();
109 Pretty::Array(collect)
110 })
111 .collect();
112 Pretty::Array(data)
113 }
114}
115
116impl_plan_tree_node_for_leaf! { Logical, LogicalValues }
117impl Distill for LogicalValues {
118 fn distill<'a>(&self) -> XmlNode<'a> {
119 let data = self.rows_pretty();
120 let fields = vec![("rows", data), ("schema", Pretty::debug(&self.schema()))];
121 childless_record("LogicalValues", fields)
122 }
123}
124
125impl ExprRewritable<Logical> for LogicalValues {
126 fn has_rewritable_expr(&self) -> bool {
127 true
128 }
129
130 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
131 let mut new = self.clone();
132 new.rows = new
133 .rows
134 .iter()
135 .map(|exprs| {
136 exprs
137 .iter()
138 .map(|e| r.rewrite_expr(e.clone()))
139 .collect::<Vec<_>>()
140 })
141 .collect::<Vec<_>>()
142 .into();
143 new.base = new.base.clone_with_new_plan_id();
144 new.into()
145 }
146}
147
148impl ExprVisitable for LogicalValues {
149 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
150 self.rows.iter().flatten().for_each(|e| v.visit_expr(e));
151 }
152}
153
154impl ColPrunable for LogicalValues {
155 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
156 let rows = self
157 .rows
158 .iter()
159 .map(|row| required_cols.iter().map(|i| row[*i].clone()).collect())
160 .collect();
161 let fields = required_cols
162 .iter()
163 .map(|i| self.schema().fields[*i].clone())
164 .collect();
165 Self::new(rows, Schema { fields }, self.base.ctx()).into()
166 }
167}
168
169impl PredicatePushdown for LogicalValues {
170 fn predicate_pushdown(
171 &self,
172 predicate: Condition,
173 _ctx: &mut PredicatePushdownContext,
174 ) -> PlanRef {
175 LogicalFilter::create(self.clone().into(), predicate)
176 }
177}
178
179impl ToBatch for LogicalValues {
180 fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
181 Ok(BatchValues::new(self.clone()).into())
182 }
183}
184
185impl ToStream for LogicalValues {
186 fn to_stream(
187 &self,
188 _ctx: &mut ToStreamContext,
189 ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
190 Ok(StreamValues::new(self.clone()).into())
191 }
192
193 fn logical_rewrite_for_stream(
194 &self,
195 _ctx: &mut RewriteStreamContext,
196 ) -> Result<(PlanRef, ColIndexMapping)> {
197 let row_id_index = self.schema().len();
198 let col_index_mapping = ColIndexMapping::identity_or_none(row_id_index, row_id_index + 1);
199 let ctx = self.ctx();
200 let mut schema = self.schema().clone();
201 schema
202 .fields
203 .push(Field::with_name(DataType::Int64, "_row_id"));
204 let rows = self.rows().to_owned();
205 let row_with_id = rows
206 .into_iter()
207 .enumerate()
208 .map(|(i, mut r)| {
209 r.push(Literal::new(Some(ScalarImpl::Int64(i as i64)), DataType::Int64).into());
210 r
211 })
212 .collect_vec();
213 let logical_values = Self::new_with_pk(row_with_id, schema, ctx, row_id_index);
214 Ok((logical_values.into(), col_index_mapping))
215 }
216}
217
218#[cfg(test)]
219mod tests {
220
221 use risingwave_common::types::Datum;
222
223 use super::*;
224 use crate::optimizer::optimizer_context::OptimizerContext;
225
226 fn literal(val: i32) -> ExprImpl {
227 Literal::new(Datum::Some(val.into()), DataType::Int32).into()
228 }
229
230 #[tokio::test]
239 async fn test_prune_filter() {
240 let ctx = OptimizerContext::mock().await;
241 let schema = Schema::new(vec![
242 Field::with_name(DataType::Int32, "v1"),
243 Field::with_name(DataType::Int32, "v2"),
244 Field::with_name(DataType::Int32, "v3"),
245 ]);
246 let values: PlanRef = LogicalValues::new(
248 vec![
249 vec![literal(0), literal(1), literal(2)],
250 vec![literal(3), literal(4), literal(5)],
251 ],
252 schema,
253 ctx,
254 )
255 .into();
256
257 let required_cols = vec![0, 2];
258 let pruned = values.prune_col(
259 &required_cols,
260 &mut ColumnPruningContext::new(values.clone()),
261 );
262
263 let values = pruned.as_logical_values().unwrap();
264 let rows: &[Vec<ExprImpl>] = values.rows();
265
266 assert_eq!(rows.len(), 2);
268 assert_eq!(rows[0].len(), 2);
269 assert_eq!(rows[0][0], literal(0));
270 assert_eq!(rows[0][1], literal(2));
271 assert_eq!(rows[1].len(), 2);
272 assert_eq!(rows[1][0], literal(3));
273 assert_eq!(rows[1][1], literal(5));
274 }
275}