risingwave_frontend/optimizer/plan_node/
stream_expand.rsuse fixedbitset::FixedBitSet;
use risingwave_pb::stream_plan::expand_node::Subset;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::ExpandNode;
use super::stream::prelude::*;
use super::utils::impl_distill_by_unit;
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::property::{Distribution, MonotonicityMap};
use crate::stream_fragmenter::BuildFragmentGraphState;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamExpand {
pub base: PlanBase<Stream>,
core: generic::Expand<PlanRef>,
}
impl StreamExpand {
pub fn new(core: generic::Expand<PlanRef>) -> Self {
let input = core.input.clone();
let input_len = input.schema().len();
let dist = match input.distribution() {
Distribution::Single => Distribution::Single,
Distribution::SomeShard
| Distribution::HashShard(_)
| Distribution::UpstreamHashShard(_, _) => Distribution::SomeShard,
Distribution::Broadcast => unreachable!(),
};
let mut watermark_columns = FixedBitSet::with_capacity(core.output_len());
watermark_columns.extend(input.watermark_columns().ones().map(|idx| idx + input_len));
let base = PlanBase::new_stream_with_core(
&core,
dist,
input.append_only(),
input.emit_on_window_close(),
watermark_columns,
MonotonicityMap::new(),
);
StreamExpand { base, core }
}
pub fn column_subsets(&self) -> &[Vec<usize>] {
&self.core.column_subsets
}
}
impl PlanTreeNodeUnary for StreamExpand {
fn input(&self) -> PlanRef {
self.core.input.clone()
}
fn clone_with_input(&self, input: PlanRef) -> Self {
let mut core = self.core.clone();
core.input = input;
Self::new(core)
}
}
impl_plan_tree_node_for_unary! { StreamExpand }
impl_distill_by_unit!(StreamExpand, core, "StreamExpand");
impl StreamNode for StreamExpand {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
PbNodeBody::Expand(ExpandNode {
column_subsets: self
.column_subsets()
.iter()
.map(|subset| subset_to_protobuf(subset))
.collect(),
})
}
}
fn subset_to_protobuf(subset: &[usize]) -> Subset {
let column_indices = subset.iter().map(|key| *key as u32).collect();
Subset { column_indices }
}
impl ExprRewritable for StreamExpand {}
impl ExprVisitable for StreamExpand {}