risingwave_frontend/optimizer/plan_node/
stream_topn.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::stream_node::PbNodeBody;
17
18use super::generic::{DistillUnit, TopNLimit};
19use super::stream::prelude::*;
20use super::utils::{Distill, plan_node_name};
21use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode, generic};
22use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
23use crate::optimizer::property::{Distribution, MonotonicityMap, Order, WatermarkColumns};
24use crate::stream_fragmenter::BuildFragmentGraphState;
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
28pub struct StreamTopN {
29 pub base: PlanBase<Stream>,
30 core: generic::TopN<PlanRef>,
31}
32
33impl StreamTopN {
34 pub fn new(core: generic::TopN<PlanRef>) -> Self {
35 assert!(core.group_key.is_empty());
36 assert!(core.limit_attr.limit() > 0);
37 let input = &core.input;
38 let dist = match input.distribution() {
39 Distribution::Single => Distribution::Single,
40 _ => panic!(),
41 };
42 let watermark_columns = WatermarkColumns::new();
43
44 let base = PlanBase::new_stream_with_core(
45 &core,
46 dist,
47 false,
48 false,
49 watermark_columns,
50 MonotonicityMap::new(),
51 );
52 StreamTopN { base, core }
53 }
54
55 pub fn limit_attr(&self) -> TopNLimit {
56 self.core.limit_attr
57 }
58
59 pub fn offset(&self) -> u64 {
60 self.core.offset
61 }
62
63 pub fn topn_order(&self) -> &Order {
64 &self.core.order
65 }
66}
67
68impl Distill for StreamTopN {
69 fn distill<'a>(&self) -> XmlNode<'a> {
70 let name = plan_node_name!("StreamTopN",
71 { "append_only", self.input().append_only() },
72 );
73 self.core.distill_with_name(name)
74 }
75}
76
77impl PlanTreeNodeUnary for StreamTopN {
78 fn input(&self) -> PlanRef {
79 self.core.input.clone()
80 }
81
82 fn clone_with_input(&self, input: PlanRef) -> Self {
83 let mut core = self.core.clone();
84 core.input = input;
85 Self::new(core)
86 }
87}
88
89impl_plan_tree_node_for_unary! { StreamTopN }
90
91impl StreamNode for StreamTopN {
92 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
93 use risingwave_pb::stream_plan::*;
94
95 let input = self.input();
96 let topn_node = TopNNode {
97 limit: self.limit_attr().limit(),
98 offset: self.offset(),
99 with_ties: self.limit_attr().with_ties(),
100 table: Some(
101 self.core
102 .infer_internal_table_catalog(
103 input.schema(),
104 input.ctx(),
105 input.expect_stream_key(),
106 None,
107 )
108 .with_id(state.gen_table_id_wrapped())
109 .to_internal_table_prost(),
110 ),
111 order_by: self.topn_order().to_protobuf(),
112 };
113 if self.input().append_only() {
114 PbNodeBody::AppendOnlyTopN(Box::new(topn_node))
115 } else {
116 PbNodeBody::TopN(Box::new(topn_node))
117 }
118 }
119}
120
121impl ExprRewritable for StreamTopN {}
122
123impl ExprVisitable for StreamTopN {}