risingwave_frontend/optimizer/plan_node/
batch_values.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_pb::batch_plan::ValuesNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18use risingwave_pb::batch_plan::values_node::ExprTuple;
19
20use super::batch::prelude::*;
21use super::utils::{Distill, childless_record};
22use super::{
23    ExprRewritable, LogicalValues, PlanBase, PlanRef, PlanTreeNodeLeaf, ToBatchPb,
24    ToDistributedBatch,
25};
26use crate::error::Result;
27use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor};
28use crate::optimizer::plan_node::ToLocalBatch;
29use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
30use crate::optimizer::property::{Distribution, Order};
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct BatchValues {
34    pub base: PlanBase<Batch>,
35    logical: LogicalValues,
36}
37
38impl PlanTreeNodeLeaf for BatchValues {}
39impl_plan_tree_node_for_leaf!(BatchValues);
40
41impl BatchValues {
42    pub fn new(logical: LogicalValues) -> Self {
43        Self::with_dist(logical, Distribution::Single)
44    }
45
46    pub fn with_dist(logical: LogicalValues, dist: Distribution) -> Self {
47        let ctx = logical.base.ctx().clone();
48        let base = PlanBase::new_batch(ctx, logical.schema().clone(), dist, Order::any());
49        BatchValues { base, logical }
50    }
51
52    /// Get a reference to the batch values's logical.
53    #[must_use]
54    pub fn logical(&self) -> &LogicalValues {
55        &self.logical
56    }
57
58    fn row_to_protobuf(&self, row: &[ExprImpl]) -> ExprTuple {
59        let cells = row.iter().map(|x| x.to_expr_proto()).collect();
60        ExprTuple { cells }
61    }
62}
63
64impl Distill for BatchValues {
65    fn distill<'a>(&self) -> XmlNode<'a> {
66        let data = self.logical.rows_pretty();
67        childless_record("BatchValues", vec![("rows", data)])
68    }
69}
70
71impl ToDistributedBatch for BatchValues {
72    fn to_distributed(&self) -> Result<PlanRef> {
73        Ok(Self::with_dist(self.logical().clone(), Distribution::Single).into())
74    }
75}
76
77impl ToBatchPb for BatchValues {
78    fn to_batch_prost_body(&self) -> NodeBody {
79        NodeBody::Values(ValuesNode {
80            tuples: self
81                .logical
82                .rows()
83                .iter()
84                .map(|row| self.row_to_protobuf(row))
85                .collect(),
86            fields: self
87                .logical
88                .schema()
89                .fields()
90                .iter()
91                .map(|f| f.to_prost())
92                .collect(),
93        })
94    }
95}
96
97impl ToLocalBatch for BatchValues {
98    fn to_local(&self) -> Result<PlanRef> {
99        Ok(Self::with_dist(self.logical().clone(), Distribution::Single).into())
100    }
101}
102
103impl ExprRewritable for BatchValues {
104    fn has_rewritable_expr(&self) -> bool {
105        true
106    }
107
108    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
109        Self::new(
110            self.logical
111                .rewrite_exprs(r)
112                .as_logical_values()
113                .unwrap()
114                .clone(),
115        )
116        .into()
117    }
118}
119
120impl ExprVisitable for BatchValues {
121    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
122        self.logical
123            .rows()
124            .iter()
125            .flatten()
126            .for_each(|e| v.visit_expr(e));
127    }
128}