risingwave_frontend/optimizer/plan_node/
stream_group_topn.rs1use pretty_xmlish::XmlNode;
16use risingwave_pb::stream_plan::stream_node::PbNodeBody;
17
18use super::generic::{DistillUnit, TopNLimit};
19use super::stream::prelude::*;
20use super::utils::{Distill, plan_node_name, watermark_pretty};
21use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode, generic};
22use crate::PlanRef;
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::plan_node::generic::GenericPlanNode;
25use crate::optimizer::property::{MonotonicityMap, Order};
26use crate::stream_fragmenter::BuildFragmentGraphState;
27
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct StreamGroupTopN {
30 pub base: PlanBase<Stream>,
31 core: generic::TopN<PlanRef>,
32 vnode_col_idx: Option<usize>,
35}
36
37impl StreamGroupTopN {
38 pub fn new(core: generic::TopN<PlanRef>, vnode_col_idx: Option<usize>) -> Self {
39 assert!(!core.group_key.is_empty());
40 assert!(core.limit_attr.limit() > 0);
41 let input = &core.input;
42
43 let watermark_columns = if input.append_only() {
46 input.watermark_columns().clone()
47 } else {
48 input.watermark_columns().retain_clone(&core.group_key)
49 };
50
51 let mut stream_key = core
52 .stream_key()
53 .expect("logical node should have stream key here");
54 if let Some(vnode_col_idx) = vnode_col_idx
55 && stream_key.len() > 1
56 {
57 stream_key.remove(stream_key.iter().position(|i| *i == vnode_col_idx).unwrap());
62 }
63
64 let base = PlanBase::new_stream(
65 core.ctx(),
66 core.schema(),
67 Some(stream_key),
68 core.functional_dependency(),
69 input.distribution().clone(),
70 false,
71 false,
73 watermark_columns,
74 MonotonicityMap::new(), );
76 StreamGroupTopN {
77 base,
78 core,
79 vnode_col_idx,
80 }
81 }
82
83 pub fn limit_attr(&self) -> TopNLimit {
84 self.core.limit_attr
85 }
86
87 pub fn offset(&self) -> u64 {
88 self.core.offset
89 }
90
91 pub fn topn_order(&self) -> &Order {
92 &self.core.order
93 }
94
95 pub fn group_key(&self) -> &[usize] {
96 &self.core.group_key
97 }
98}
99
100impl StreamNode for StreamGroupTopN {
101 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
102 use risingwave_pb::stream_plan::*;
103
104 let input = self.input();
105 let table = self
106 .core
107 .infer_internal_table_catalog(
108 input.schema(),
109 input.ctx(),
110 input.expect_stream_key(),
111 self.vnode_col_idx,
112 )
113 .with_id(state.gen_table_id_wrapped());
114 assert!(!self.group_key().is_empty());
115 let group_topn_node = GroupTopNNode {
116 limit: self.limit_attr().limit(),
117 offset: self.offset(),
118 with_ties: self.limit_attr().with_ties(),
119 group_key: self.group_key().iter().map(|idx| *idx as u32).collect(),
120 table: Some(table.to_internal_table_prost()),
121 order_by: self.topn_order().to_protobuf(),
122 };
123 if self.input().append_only() {
124 PbNodeBody::AppendOnlyGroupTopN(Box::new(group_topn_node))
125 } else {
126 PbNodeBody::GroupTopN(Box::new(group_topn_node))
127 }
128 }
129}
130
131impl Distill for StreamGroupTopN {
132 fn distill<'a>(&self) -> XmlNode<'a> {
133 let name = plan_node_name!("StreamGroupTopN",
134 { "append_only", self.input().append_only() },
135 );
136 let mut node = self.core.distill_with_name(name);
137 if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
138 node.fields.push(("output_watermarks".into(), ow));
139 }
140 node
141 }
142}
143
144impl_plan_tree_node_for_unary! { StreamGroupTopN }
145
146impl PlanTreeNodeUnary for StreamGroupTopN {
147 fn input(&self) -> PlanRef {
148 self.core.input.clone()
149 }
150
151 fn clone_with_input(&self, input: PlanRef) -> Self {
152 let mut core = self.core.clone();
153 core.input = input;
154 Self::new(core, self.vnode_col_idx)
155 }
156}
157
158impl ExprRewritable for StreamGroupTopN {}
159
160impl ExprVisitable for StreamGroupTopN {}