risingwave_frontend/optimizer/plan_node/
stream_exchange.rsuse pretty_xmlish::{Pretty, XmlNode};
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{DispatchStrategy, DispatcherType, ExchangeNode};
use super::stream::prelude::*;
use super::utils::{childless_record, plan_node_name, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::property::{
Distribution, DistributionDisplay, MonotonicityMap, RequiredDist,
};
use crate::stream_fragmenter::BuildFragmentGraphState;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamExchange {
pub base: PlanBase<Stream>,
input: PlanRef,
no_shuffle: bool,
}
impl StreamExchange {
pub fn new(input: PlanRef, dist: Distribution) -> Self {
let columns_monotonicity = if input.distribution().satisfies(&RequiredDist::single()) {
input.columns_monotonicity().clone()
} else {
MonotonicityMap::new()
};
assert!(!input.schema().is_empty());
let base = PlanBase::new_stream(
input.ctx(),
input.schema().clone(),
input.stream_key().map(|v| v.to_vec()),
input.functional_dependency().clone(),
dist,
input.append_only(), input.emit_on_window_close(),
input.watermark_columns().clone(),
columns_monotonicity,
);
StreamExchange {
base,
input,
no_shuffle: false,
}
}
pub fn new_no_shuffle(input: PlanRef) -> Self {
let ctx = input.ctx();
let base = PlanBase::new_stream(
ctx,
input.schema().clone(),
input.stream_key().map(|v| v.to_vec()),
input.functional_dependency().clone(),
input.distribution().clone(),
input.append_only(), input.emit_on_window_close(),
input.watermark_columns().clone(),
input.columns_monotonicity().clone(),
);
StreamExchange {
base,
input,
no_shuffle: true,
}
}
pub fn no_shuffle(&self) -> bool {
self.no_shuffle
}
}
impl Distill for StreamExchange {
fn distill<'a>(&self) -> XmlNode<'a> {
let distribution_display = DistributionDisplay {
distribution: self.base.distribution(),
input_schema: self.input.schema(),
};
childless_record(
plan_node_name!(
"StreamExchange",
{ "no_shuffle", self.no_shuffle },
),
vec![("dist", Pretty::display(&distribution_display))],
)
}
}
impl PlanTreeNodeUnary for StreamExchange {
fn input(&self) -> PlanRef {
self.input.clone()
}
fn clone_with_input(&self, input: PlanRef) -> Self {
if self.no_shuffle {
Self::new_no_shuffle(input)
} else {
Self::new(input, self.distribution().clone())
}
}
}
impl_plan_tree_node_for_unary! {StreamExchange}
impl StreamNode for StreamExchange {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> NodeBody {
NodeBody::Exchange(ExchangeNode {
strategy: if self.no_shuffle {
Some(DispatchStrategy {
r#type: DispatcherType::NoShuffle as i32,
dist_key_indices: vec![],
output_indices: (0..self.schema().len() as u32).collect(),
})
} else {
Some(DispatchStrategy {
r#type: match &self.base.distribution() {
Distribution::HashShard(_) => DispatcherType::Hash,
Distribution::Single => DispatcherType::Simple,
Distribution::Broadcast => DispatcherType::Broadcast,
_ => panic!("Do not allow Any or AnyShard in serialization process"),
} as i32,
dist_key_indices: match &self.base.distribution() {
Distribution::HashShard(keys) => {
keys.iter().map(|num| *num as u32).collect()
}
_ => vec![],
},
output_indices: (0..self.schema().len() as u32).collect(),
})
},
})
}
}
impl ExprRewritable for StreamExchange {}
impl ExprVisitable for StreamExchange {}