risingwave_frontend/optimizer/plan_node/
stream_topn.rsuse fixedbitset::FixedBitSet;
use pretty_xmlish::XmlNode;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use super::generic::{DistillUnit, TopNLimit};
use super::stream::prelude::*;
use super::utils::{plan_node_name, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::property::{Distribution, MonotonicityMap, Order};
use crate::stream_fragmenter::BuildFragmentGraphState;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamTopN {
pub base: PlanBase<Stream>,
core: generic::TopN<PlanRef>,
}
impl StreamTopN {
pub fn new(core: generic::TopN<PlanRef>) -> Self {
assert!(core.group_key.is_empty());
assert!(core.limit_attr.limit() > 0);
let input = &core.input;
let dist = match input.distribution() {
Distribution::Single => Distribution::Single,
_ => panic!(),
};
let watermark_columns = FixedBitSet::with_capacity(input.schema().len());
let base = PlanBase::new_stream_with_core(
&core,
dist,
false,
false,
watermark_columns,
MonotonicityMap::new(),
);
StreamTopN { base, core }
}
pub fn limit_attr(&self) -> TopNLimit {
self.core.limit_attr
}
pub fn offset(&self) -> u64 {
self.core.offset
}
pub fn topn_order(&self) -> &Order {
&self.core.order
}
}
impl Distill for StreamTopN {
fn distill<'a>(&self) -> XmlNode<'a> {
let name = plan_node_name!("StreamTopN",
{ "append_only", self.input().append_only() },
);
self.core.distill_with_name(name)
}
}
impl PlanTreeNodeUnary for StreamTopN {
fn input(&self) -> PlanRef {
self.core.input.clone()
}
fn clone_with_input(&self, input: PlanRef) -> Self {
let mut core = self.core.clone();
core.input = input;
Self::new(core)
}
}
impl_plan_tree_node_for_unary! { StreamTopN }
impl StreamNode for StreamTopN {
fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
use risingwave_pb::stream_plan::*;
let input = self.input();
let topn_node = TopNNode {
limit: self.limit_attr().limit(),
offset: self.offset(),
with_ties: self.limit_attr().with_ties(),
table: Some(
self.core
.infer_internal_table_catalog(
input.schema(),
input.ctx(),
input.expect_stream_key(),
None,
)
.with_id(state.gen_table_id_wrapped())
.to_internal_table_prost(),
),
order_by: self.topn_order().to_protobuf(),
};
if self.input().append_only() {
PbNodeBody::AppendOnlyTopN(topn_node)
} else {
PbNodeBody::TopN(topn_node)
}
}
}
impl ExprRewritable for StreamTopN {}
impl ExprVisitable for StreamTopN {}