risingwave_frontend/optimizer/plan_node/
stream_values.rsuse fixedbitset::FixedBitSet;
use pretty_xmlish::XmlNode;
use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
use risingwave_pb::stream_plan::values_node::ExprTuple;
use risingwave_pb::stream_plan::ValuesNode;
use super::stream::prelude::*;
use super::utils::{childless_record, Distill};
use super::{ExprRewritable, LogicalValues, PlanBase, StreamNode};
use crate::expr::{Expr, ExprImpl, ExprVisitor};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::property::{Distribution, MonotonicityMap};
use crate::stream_fragmenter::BuildFragmentGraphState;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamValues {
pub base: PlanBase<Stream>,
logical: LogicalValues,
}
impl_plan_tree_node_for_leaf! { StreamValues }
impl StreamValues {
pub fn new(logical: LogicalValues) -> Self {
let ctx = logical.ctx();
let base = PlanBase::new_stream(
ctx,
logical.schema().clone(),
logical.stream_key().map(|v| v.to_vec()),
logical.functional_dependency().clone(),
Distribution::Single,
true,
false,
FixedBitSet::with_capacity(logical.schema().len()),
MonotonicityMap::new(),
);
Self { base, logical }
}
pub fn logical(&self) -> &LogicalValues {
&self.logical
}
fn row_to_protobuf(&self, row: &[ExprImpl]) -> ExprTuple {
let cells = row.iter().map(|x| x.to_expr_proto()).collect();
ExprTuple { cells }
}
}
impl Distill for StreamValues {
fn distill<'a>(&self) -> XmlNode<'a> {
let data = self.logical.rows_pretty();
childless_record("StreamValues", vec![("rows", data)])
}
}
impl StreamNode for StreamValues {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> ProstStreamNode {
ProstStreamNode::Values(ValuesNode {
tuples: self
.logical
.rows()
.iter()
.map(|row| self.row_to_protobuf(row))
.collect(),
fields: self
.logical
.schema()
.fields()
.iter()
.map(|f| f.to_prost())
.collect(),
})
}
}
impl ExprRewritable for StreamValues {
fn has_rewritable_expr(&self) -> bool {
true
}
fn rewrite_exprs(&self, r: &mut dyn crate::expr::ExprRewriter) -> crate::PlanRef {
Self::new(
self.logical
.rewrite_exprs(r)
.as_logical_values()
.unwrap()
.clone(),
)
.into()
}
}
impl ExprVisitable for StreamValues {
fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
self.logical
.rows()
.iter()
.flatten()
.for_each(|e| v.visit_expr(e));
}
}