risingwave_frontend/optimizer/plan_node/
stream_values.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::ValuesNode;
17use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
18use risingwave_pb::stream_plan::values_node::ExprTuple;
19
20use super::stream::prelude::*;
21use super::utils::{Distill, childless_record};
22use super::{ExprRewritable, LogicalValues, PlanBase, StreamNode};
23use crate::expr::{Expr, ExprImpl, ExprVisitor};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::property::{Distribution, MonotonicityMap, WatermarkColumns};
26use crate::stream_fragmenter::BuildFragmentGraphState;
27
28/// `StreamValues` implements `LogicalValues.to_stream()`
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct StreamValues {
31    pub base: PlanBase<Stream>,
32    logical: LogicalValues,
33}
34
35impl_plan_tree_node_for_leaf! { StreamValues }
36
37impl StreamValues {
38    /// `StreamValues` should enforce `Distribution::Single`
39    pub fn new(logical: LogicalValues) -> Self {
40        let ctx = logical.ctx();
41        let base = PlanBase::new_stream(
42            ctx,
43            logical.schema().clone(),
44            logical.stream_key().map(|v| v.to_vec()),
45            logical.functional_dependency().clone(),
46            Distribution::Single,
47            true,
48            false,
49            WatermarkColumns::new(),
50            MonotonicityMap::new(),
51        );
52        Self { base, logical }
53    }
54
55    pub fn logical(&self) -> &LogicalValues {
56        &self.logical
57    }
58
59    fn row_to_protobuf(&self, row: &[ExprImpl]) -> ExprTuple {
60        let cells = row.iter().map(|x| x.to_expr_proto()).collect();
61        ExprTuple { cells }
62    }
63}
64
65impl Distill for StreamValues {
66    fn distill<'a>(&self) -> XmlNode<'a> {
67        let data = self.logical.rows_pretty();
68        childless_record("StreamValues", vec![("rows", data)])
69    }
70}
71
72impl StreamNode for StreamValues {
73    fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> ProstStreamNode {
74        ProstStreamNode::Values(Box::new(ValuesNode {
75            tuples: self
76                .logical
77                .rows()
78                .iter()
79                .map(|row| self.row_to_protobuf(row))
80                .collect(),
81            fields: self
82                .logical
83                .schema()
84                .fields()
85                .iter()
86                .map(|f| f.to_prost())
87                .collect(),
88        }))
89    }
90}
91
92impl ExprRewritable for StreamValues {
93    fn has_rewritable_expr(&self) -> bool {
94        true
95    }
96
97    fn rewrite_exprs(&self, r: &mut dyn crate::expr::ExprRewriter) -> crate::PlanRef {
98        Self::new(
99            self.logical
100                .rewrite_exprs(r)
101                .as_logical_values()
102                .unwrap()
103                .clone(),
104        )
105        .into()
106    }
107}
108
109impl ExprVisitable for StreamValues {
110    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
111        self.logical
112            .rows()
113            .iter()
114            .flatten()
115            .for_each(|e| v.visit_expr(e));
116    }
117}