risingwave_frontend/optimizer/plan_node/
stream_group_topn.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::stream_node::PbNodeBody;
17
18use super::generic::{DistillUnit, TopNLimit};
19use super::stream::prelude::*;
20use super::utils::{Distill, plan_node_name, watermark_pretty};
21use super::{
22 ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, StreamPlanRef as PlanRef, generic,
23};
24use crate::error::Result;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::generic::GenericPlanNode;
27use crate::optimizer::property::{MonotonicityMap, Order};
28use crate::stream_fragmenter::BuildFragmentGraphState;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct StreamGroupTopN {
32 pub base: PlanBase<Stream>,
33 core: generic::TopN<PlanRef>,
34 vnode_col_idx: Option<usize>,
37}
38
39impl StreamGroupTopN {
40 pub fn new(core: generic::TopN<PlanRef>, vnode_col_idx: Option<usize>) -> Result<Self> {
41 assert!(!core.group_key.is_empty());
42 assert!(core.limit_attr.limit() > 0);
43 reject_upsert_input!(core.input);
44
45 let input = &core.input;
46
47 let watermark_columns = if input.append_only() {
50 input.watermark_columns().clone()
51 } else {
52 input.watermark_columns().retain_clone(&core.group_key)
53 };
54
55 let mut stream_key = core
56 .stream_key()
57 .expect("logical node should have stream key here");
58 if let Some(vnode_col_idx) = vnode_col_idx
59 && stream_key.len() > 1
60 {
61 stream_key.remove(stream_key.iter().position(|i| *i == vnode_col_idx).unwrap());
66 }
67
68 let base = PlanBase::new_stream(
69 core.ctx(),
70 core.schema(),
71 Some(stream_key),
72 core.functional_dependency(),
73 input.distribution().clone(),
74 StreamKind::Retract,
75 false,
77 watermark_columns,
78 MonotonicityMap::new(), );
80
81 Ok(StreamGroupTopN {
82 base,
83 core,
84 vnode_col_idx,
85 })
86 }
87
88 pub fn limit_attr(&self) -> TopNLimit {
89 self.core.limit_attr
90 }
91
92 pub fn offset(&self) -> u64 {
93 self.core.offset
94 }
95
96 pub fn topn_order(&self) -> &Order {
97 &self.core.order
98 }
99
100 pub fn group_key(&self) -> &[usize] {
101 &self.core.group_key
102 }
103}
104
105impl StreamNode for StreamGroupTopN {
106 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
107 use risingwave_pb::stream_plan::*;
108
109 let input = self.input();
110 let table = self
111 .core
112 .infer_internal_table_catalog(
113 input.schema(),
114 input.ctx(),
115 input.expect_stream_key(),
116 self.vnode_col_idx,
117 )
118 .with_id(state.gen_table_id_wrapped());
119 assert!(!self.group_key().is_empty());
120 let group_topn_node = GroupTopNNode {
121 limit: self.limit_attr().limit(),
122 offset: self.offset(),
123 with_ties: self.limit_attr().with_ties(),
124 group_key: self.group_key().iter().map(|idx| *idx as u32).collect(),
125 table: Some(table.to_internal_table_prost()),
126 order_by: self.topn_order().to_protobuf(),
127 };
128 if self.input().append_only() {
129 PbNodeBody::AppendOnlyGroupTopN(Box::new(group_topn_node))
130 } else {
131 PbNodeBody::GroupTopN(Box::new(group_topn_node))
132 }
133 }
134}
135
136impl Distill for StreamGroupTopN {
137 fn distill<'a>(&self) -> XmlNode<'a> {
138 let name = plan_node_name!("StreamGroupTopN",
139 { "append_only", self.input().append_only() },
140 );
141 let mut node = self.core.distill_with_name(name);
142 if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
143 node.fields.push(("output_watermarks".into(), ow));
144 }
145 node
146 }
147}
148
149impl_plan_tree_node_for_unary! { Stream, StreamGroupTopN }
150
151impl PlanTreeNodeUnary<Stream> for StreamGroupTopN {
152 fn input(&self) -> PlanRef {
153 self.core.input.clone()
154 }
155
156 fn clone_with_input(&self, input: PlanRef) -> Self {
157 let mut core = self.core.clone();
158 core.input = input;
159 Self::new(core, self.vnode_col_idx).unwrap()
160 }
161}
162
163impl ExprRewritable<Stream> for StreamGroupTopN {}
164
165impl ExprVisitable for StreamGroupTopN {}