risingwave_frontend/optimizer/plan_node/
batch_values.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::batch_plan::ValuesNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18use risingwave_pb::batch_plan::values_node::ExprTuple;
19
20use super::batch::prelude::*;
21use super::utils::{Distill, childless_record};
22use super::{
23 ExprRewritable, LogicalValues, PlanBase, PlanRef, PlanTreeNodeLeaf, ToBatchPb,
24 ToDistributedBatch,
25};
26use crate::error::Result;
27use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
28use crate::optimizer::plan_node::ToLocalBatch;
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::optimizer::property::{Distribution, Order};
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct BatchValues {
34 pub base: PlanBase<Batch>,
35 logical: LogicalValues,
36}
37
38impl PlanTreeNodeLeaf for BatchValues {}
39impl_plan_tree_node_for_leaf!(BatchValues);
40
41impl BatchValues {
42 pub fn new(logical: LogicalValues) -> Self {
43 Self::with_dist(logical, Distribution::Single)
44 }
45
46 pub fn with_dist(logical: LogicalValues, dist: Distribution) -> Self {
47 let ctx = logical.base.ctx().clone();
48 let base = PlanBase::new_batch(ctx, logical.schema().clone(), dist, Order::any());
49 BatchValues { base, logical }
50 }
51
52 #[must_use]
54 pub fn logical(&self) -> &LogicalValues {
55 &self.logical
56 }
57
58 fn row_to_protobuf(&self, row: &[ExprImpl]) -> ExprTuple {
59 let cells = row.iter().map(|x| x.to_expr_proto()).collect();
60 ExprTuple { cells }
61 }
62}
63
64impl Distill for BatchValues {
65 fn distill<'a>(&self) -> XmlNode<'a> {
66 let data = self.logical.rows_pretty();
67 childless_record("BatchValues", vec![("rows", data)])
68 }
69}
70
71impl ToDistributedBatch for BatchValues {
72 fn to_distributed(&self) -> Result<PlanRef> {
73 Ok(Self::with_dist(self.logical().clone(), Distribution::Single).into())
74 }
75}
76
77impl ToBatchPb for BatchValues {
78 fn to_batch_prost_body(&self) -> NodeBody {
79 NodeBody::Values(ValuesNode {
80 tuples: self
81 .logical
82 .rows()
83 .iter()
84 .map(|row| self.row_to_protobuf(row))
85 .collect(),
86 fields: self
87 .logical
88 .schema()
89 .fields()
90 .iter()
91 .map(|f| f.to_prost())
92 .collect(),
93 })
94 }
95}
96
97impl ToLocalBatch for BatchValues {
98 fn to_local(&self) -> Result<PlanRef> {
99 Ok(Self::with_dist(self.logical().clone(), Distribution::Single).into())
100 }
101}
102
103impl ExprRewritable for BatchValues {
104 fn has_rewritable_expr(&self) -> bool {
105 true
106 }
107
108 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
109 Self::new(
110 self.logical
111 .rewrite_exprs(r)
112 .as_logical_values()
113 .unwrap()
114 .clone(),
115 )
116 .into()
117 }
118}
119
120impl ExprVisitable for BatchValues {
121 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
122 self.logical
123 .rows()
124 .iter()
125 .flatten()
126 .for_each(|e| v.visit_expr(e));
127 }
128}