risingwave_frontend/optimizer/plan_node/
stream_topn.rs1use std::assert_matches::assert_matches;
16
17use pretty_xmlish::XmlNode;
18use risingwave_pb::stream_plan::stream_node::PbNodeBody;
19
20use super::generic::{DistillUnit, TopNLimit};
21use super::stream::prelude::*;
22use super::utils::{Distill, plan_node_name};
23use super::{
24 ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, generic,
25};
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::property::{Distribution, MonotonicityMap, Order, WatermarkColumns};
28use crate::stream_fragmenter::BuildFragmentGraphState;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub struct StreamTopN {
33 pub base: PlanBase<Stream>,
34 core: generic::TopN<PlanRef>,
35}
36
37impl StreamTopN {
38 pub fn new(core: generic::TopN<PlanRef>) -> Result<Self> {
39 assert!(core.group_key.is_empty());
40 assert!(core.limit_attr.limit() > 0);
41 let input = &core.input;
42 assert_matches!(input.distribution(), Distribution::Single);
43 reject_upsert_input!(input);
44 let watermark_columns = WatermarkColumns::new();
45
46 let base = PlanBase::new_stream_with_core(
47 &core,
48 Distribution::Single,
49 StreamKind::Retract,
50 false,
51 watermark_columns,
52 MonotonicityMap::new(),
53 );
54
55 Ok(StreamTopN { base, core })
56 }
57
58 pub fn limit_attr(&self) -> TopNLimit {
59 self.core.limit_attr
60 }
61
62 pub fn offset(&self) -> u64 {
63 self.core.offset
64 }
65
66 pub fn topn_order(&self) -> &Order {
67 &self.core.order
68 }
69}
70
71impl Distill for StreamTopN {
72 fn distill<'a>(&self) -> XmlNode<'a> {
73 let name = plan_node_name!("StreamTopN",
74 { "append_only", self.input().append_only() },
75 );
76 self.core.distill_with_name(name)
77 }
78}
79
80impl PlanTreeNodeUnary<Stream> for StreamTopN {
81 fn input(&self) -> PlanRef {
82 self.core.input.clone()
83 }
84
85 fn clone_with_input(&self, input: PlanRef) -> Self {
86 let mut core = self.core.clone();
87 core.input = input;
88 Self::new(core).unwrap()
89 }
90}
91
92impl_plan_tree_node_for_unary! { Stream, StreamTopN }
93
94impl StreamNode for StreamTopN {
95 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
96 use risingwave_pb::stream_plan::*;
97
98 let input = self.input();
99 let topn_node = TopNNode {
100 limit: self.limit_attr().limit(),
101 offset: self.offset(),
102 with_ties: self.limit_attr().with_ties(),
103 table: Some(
104 self.core
105 .infer_internal_table_catalog(
106 input.schema(),
107 input.ctx(),
108 input.expect_stream_key(),
109 None,
110 )
111 .with_id(state.gen_table_id_wrapped())
112 .to_internal_table_prost(),
113 ),
114 order_by: self.topn_order().to_protobuf(),
115 };
116 if self.input().append_only() {
117 PbNodeBody::AppendOnlyTopN(Box::new(topn_node))
118 } else {
119 PbNodeBody::TopN(Box::new(topn_node))
120 }
121 }
122}
123
124impl ExprRewritable<Stream> for StreamTopN {}
125
126impl ExprVisitable for StreamTopN {}