risingwave_frontend/optimizer/plan_node/
stream_values.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::ValuesNode;
17use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
18use risingwave_pb::stream_plan::values_node::ExprTuple;
19
20use super::stream::prelude::*;
21use super::utils::{Distill, childless_record};
22use super::{ExprRewritable, LogicalValues, PlanBase, StreamNode};
23use crate::expr::{Expr, ExprImpl, ExprVisitor};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
26use crate::stream_fragmenter::BuildFragmentGraphState;
27
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct StreamValues {
31 pub base: PlanBase<Stream>,
32 logical: LogicalValues,
33}
34
35impl_plan_tree_node_for_leaf! { StreamValues }
36
37impl StreamValues {
38 pub fn new(logical: LogicalValues) -> Self {
40 let ctx = logical.ctx();
41 let base = PlanBase::new_stream(
42 ctx,
43 logical.schema().clone(),
44 logical.stream_key().map(|v| v.to_vec()),
45 logical.functional_dependency().clone(),
46 Distribution::Single,
47 true,
48 false,
49 WatermarkColumns::new(),
50 MonotonicityMap::new(),
51 );
52 Self { base, logical }
53 }
54
55 pub fn logical(&self) -> &LogicalValues {
56 &self.logical
57 }
58
59 fn row_to_protobuf(&self, row: &[ExprImpl]) -> ExprTuple {
60 let cells = row.iter().map(|x| x.to_expr_proto()).collect();
61 ExprTuple { cells }
62 }
63}
64
65impl Distill for StreamValues {
66 fn distill<'a>(&self) -> XmlNode<'a> {
67 let data = self.logical.rows_pretty();
68 childless_record("StreamValues", vec![("rows", data)])
69 }
70}
71
72impl StreamNode for StreamValues {
73 fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> ProstStreamNode {
74 ProstStreamNode::Values(Box::new(ValuesNode {
75 tuples: self
76 .logical
77 .rows()
78 .iter()
79 .map(|row| self.row_to_protobuf(row))
80 .collect(),
81 fields: self
82 .logical
83 .schema()
84 .fields()
85 .iter()
86 .map(|f| f.to_prost())
87 .collect(),
88 }))
89 }
90}
91
92impl ExprRewritable for StreamValues {
93 fn has_rewritable_expr(&self) -> bool {
94 true
95 }
96
97 fn rewrite_exprs(&self, r: &mut dyn crate::expr::ExprRewriter) -> crate::PlanRef {
98 Self::new(
99 self.logical
100 .rewrite_exprs(r)
101 .as_logical_values()
102 .unwrap()
103 .clone(),
104 )
105 .into()
106 }
107}
108
109impl ExprVisitable for StreamValues {
110 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
111 self.logical
112 .rows()
113 .iter()
114 .flatten()
115 .for_each(|e| v.visit_expr(e));
116 }
117}