risingwave_frontend/optimizer/plan_node/
stream_project_set.rsuse fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::ProjectSetNode;
use super::stream::prelude::*;
use super::utils::impl_distill_by_unit;
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::{ExprRewriter, ExprVisitor};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::property::{analyze_monotonicity, monotonicity_variants, MonotonicityMap};
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::ColIndexMappingRewriteExt;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamProjectSet {
pub base: PlanBase<Stream>,
core: generic::ProjectSet<PlanRef>,
watermark_derivations: Vec<(usize, usize)>,
nondecreasing_exprs: Vec<usize>,
}
impl StreamProjectSet {
pub fn new(core: generic::ProjectSet<PlanRef>) -> Self {
let input = core.input.clone();
let distribution = core
.i2o_col_mapping()
.rewrite_provided_distribution(input.distribution());
let mut watermark_derivations = vec![];
let mut nondecreasing_exprs = vec![];
let mut out_watermark_columns = FixedBitSet::with_capacity(core.output_len());
for (expr_idx, expr) in core.select_list.iter().enumerate() {
let out_expr_idx = expr_idx + 1;
use monotonicity_variants::*;
match analyze_monotonicity(expr) {
Inherent(monotonicity) => {
if monotonicity.is_non_decreasing() && !monotonicity.is_constant() {
nondecreasing_exprs.push(expr_idx); out_watermark_columns.insert(out_expr_idx);
}
}
FollowingInput(input_idx) => {
if input.watermark_columns().contains(input_idx) {
watermark_derivations.push((input_idx, expr_idx)); out_watermark_columns.insert(out_expr_idx);
}
}
_FollowingInputInversely(_) => {}
}
}
let base = PlanBase::new_stream_with_core(
&core,
distribution,
input.append_only(),
input.emit_on_window_close(),
out_watermark_columns,
MonotonicityMap::new(), );
StreamProjectSet {
base,
core,
watermark_derivations,
nondecreasing_exprs,
}
}
}
impl_distill_by_unit!(StreamProjectSet, core, "StreamProjectSet");
impl_plan_tree_node_for_unary! { StreamProjectSet }
impl PlanTreeNodeUnary for StreamProjectSet {
fn input(&self) -> PlanRef {
self.core.input.clone()
}
fn clone_with_input(&self, input: PlanRef) -> Self {
let mut core = self.core.clone();
core.input = input;
Self::new(core)
}
}
impl StreamNode for StreamProjectSet {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
let (watermark_input_cols, watermark_expr_indices) = self
.watermark_derivations
.iter()
.map(|(i, o)| (*i as u32, *o as u32))
.unzip();
PbNodeBody::ProjectSet(ProjectSetNode {
select_list: self
.core
.select_list
.iter()
.map(|select_item| select_item.to_project_set_select_item_proto())
.collect_vec(),
watermark_input_cols,
watermark_expr_indices,
nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(),
})
}
}
impl ExprRewritable for StreamProjectSet {
fn has_rewritable_expr(&self) -> bool {
true
}
fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
let mut core = self.core.clone();
core.rewrite_exprs(r);
Self::new(core).into()
}
}
impl ExprVisitable for StreamProjectSet {
fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
self.core.visit_exprs(v);
}
}