risingwave_frontend/optimizer/plan_node/
logical_values.rs
1use std::sync::Arc;
16use std::vec;
17
18use itertools::Itertools;
19use pretty_xmlish::{Pretty, XmlNode};
20use risingwave_common::catalog::{Field, Schema};
21use risingwave_common::types::{DataType, ScalarImpl};
22
23use super::generic::GenericPlanRef;
24use super::utils::{Distill, childless_record};
25use super::{
26 BatchValues, ColPrunable, ExprRewritable, Logical, LogicalFilter, PlanBase, PlanRef,
27 PredicatePushdown, StreamValues, ToBatch, ToStream,
28};
29use crate::error::Result;
30use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor, Literal};
31use crate::optimizer::optimizer_context::OptimizerContextRef;
32use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
33use crate::optimizer::plan_node::{
34 ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
35};
36use crate::optimizer::property::FunctionalDependencySet;
37use crate::utils::{ColIndexMapping, Condition};
38
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub struct LogicalValues {
42 pub base: PlanBase<Logical>,
43 rows: Arc<[Vec<ExprImpl>]>,
44}
45
46impl LogicalValues {
47 pub fn new(rows: Vec<Vec<ExprImpl>>, schema: Schema, ctx: OptimizerContextRef) -> Self {
49 for exprs in &rows {
50 for (i, expr) in exprs.iter().enumerate() {
51 assert_eq!(schema.fields()[i].data_type(), expr.return_type())
52 }
53 }
54 let functional_dependency = FunctionalDependencySet::new(schema.len());
55 let base = PlanBase::new_logical(ctx, schema, None, functional_dependency);
56 Self {
57 rows: rows.into(),
58 base,
59 }
60 }
61
62 fn new_with_pk(
64 rows: Vec<Vec<ExprImpl>>,
65 schema: Schema,
66 ctx: OptimizerContextRef,
67 pk_index: usize,
68 ) -> Self {
69 for exprs in &rows {
70 for (i, expr) in exprs.iter().enumerate() {
71 assert_eq!(schema.fields()[i].data_type(), expr.return_type())
72 }
73 }
74 let functional_dependency = FunctionalDependencySet::new(schema.len());
75 let base = PlanBase::new_logical(ctx, schema, Some(vec![pk_index]), functional_dependency);
76 Self {
77 rows: rows.into(),
78 base,
79 }
80 }
81
82 pub fn create(rows: Vec<Vec<ExprImpl>>, schema: Schema, ctx: OptimizerContextRef) -> PlanRef {
84 Self::new(rows, schema, ctx).into()
86 }
87
88 pub fn rows(&self) -> &[Vec<ExprImpl>] {
90 self.rows.as_ref()
91 }
92
93 pub(super) fn rows_pretty<'a>(&self) -> Pretty<'a> {
94 let data = self
95 .rows()
96 .iter()
97 .map(|row| {
98 let collect = row.iter().map(Pretty::debug).collect();
99 Pretty::Array(collect)
100 })
101 .collect();
102 Pretty::Array(data)
103 }
104}
105
106impl_plan_tree_node_for_leaf! { LogicalValues }
107impl Distill for LogicalValues {
108 fn distill<'a>(&self) -> XmlNode<'a> {
109 let data = self.rows_pretty();
110 let fields = vec![("rows", data), ("schema", Pretty::debug(&self.schema()))];
111 childless_record("LogicalValues", fields)
112 }
113}
114
115impl ExprRewritable for LogicalValues {
116 fn has_rewritable_expr(&self) -> bool {
117 true
118 }
119
120 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
121 let mut new = self.clone();
122 new.rows = new
123 .rows
124 .iter()
125 .map(|exprs| {
126 exprs
127 .iter()
128 .map(|e| r.rewrite_expr(e.clone()))
129 .collect::<Vec<_>>()
130 })
131 .collect::<Vec<_>>()
132 .into();
133 new.base = new.base.clone_with_new_plan_id();
134 new.into()
135 }
136}
137
138impl ExprVisitable for LogicalValues {
139 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
140 self.rows.iter().flatten().for_each(|e| v.visit_expr(e));
141 }
142}
143
144impl ColPrunable for LogicalValues {
145 fn prune_col(&self, required_cols: &[usize], _ctx: &mut ColumnPruningContext) -> PlanRef {
146 let rows = self
147 .rows
148 .iter()
149 .map(|row| required_cols.iter().map(|i| row[*i].clone()).collect())
150 .collect();
151 let fields = required_cols
152 .iter()
153 .map(|i| self.schema().fields[*i].clone())
154 .collect();
155 Self::new(rows, Schema { fields }, self.base.ctx().clone()).into()
156 }
157}
158
159impl PredicatePushdown for LogicalValues {
160 fn predicate_pushdown(
161 &self,
162 predicate: Condition,
163 _ctx: &mut PredicatePushdownContext,
164 ) -> PlanRef {
165 LogicalFilter::create(self.clone().into(), predicate)
166 }
167}
168
169impl ToBatch for LogicalValues {
170 fn to_batch(&self) -> Result<PlanRef> {
171 Ok(BatchValues::new(self.clone()).into())
172 }
173}
174
175impl ToStream for LogicalValues {
176 fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
177 Ok(StreamValues::new(self.clone()).into())
178 }
179
180 fn logical_rewrite_for_stream(
181 &self,
182 _ctx: &mut RewriteStreamContext,
183 ) -> Result<(PlanRef, ColIndexMapping)> {
184 let row_id_index = self.schema().len();
185 let col_index_mapping = ColIndexMapping::identity_or_none(row_id_index, row_id_index + 1);
186 let ctx = self.ctx();
187 let mut schema = self.schema().clone();
188 schema
189 .fields
190 .push(Field::with_name(DataType::Int64, "_row_id"));
191 let rows = self.rows().to_owned();
192 let row_with_id = rows
193 .into_iter()
194 .enumerate()
195 .map(|(i, mut r)| {
196 r.push(Literal::new(Some(ScalarImpl::Int64(i as i64)), DataType::Int64).into());
197 r
198 })
199 .collect_vec();
200 let logical_values = Self::new_with_pk(row_with_id, schema, ctx, row_id_index);
201 Ok((logical_values.into(), col_index_mapping))
202 }
203}
204
205#[cfg(test)]
206mod tests {
207
208 use risingwave_common::types::Datum;
209
210 use super::*;
211 use crate::optimizer::optimizer_context::OptimizerContext;
212
213 fn literal(val: i32) -> ExprImpl {
214 Literal::new(Datum::Some(val.into()), DataType::Int32).into()
215 }
216
217 #[tokio::test]
226 async fn test_prune_filter() {
227 let ctx = OptimizerContext::mock().await;
228 let schema = Schema::new(vec![
229 Field::with_name(DataType::Int32, "v1"),
230 Field::with_name(DataType::Int32, "v2"),
231 Field::with_name(DataType::Int32, "v3"),
232 ]);
233 let values: PlanRef = LogicalValues::new(
235 vec![
236 vec![literal(0), literal(1), literal(2)],
237 vec![literal(3), literal(4), literal(5)],
238 ],
239 schema,
240 ctx,
241 )
242 .into();
243
244 let required_cols = vec![0, 2];
245 let pruned = values.prune_col(
246 &required_cols,
247 &mut ColumnPruningContext::new(values.clone()),
248 );
249
250 let values = pruned.as_logical_values().unwrap();
251 let rows: &[Vec<ExprImpl>] = values.rows();
252
253 assert_eq!(rows.len(), 2);
255 assert_eq!(rows[0].len(), 2);
256 assert_eq!(rows[0][0], literal(0));
257 assert_eq!(rows[0][1], literal(2));
258 assert_eq!(rows[1].len(), 2);
259 assert_eq!(rows[1][0], literal(3));
260 assert_eq!(rows[1][1], literal(5));
261 }
262}