risingwave_frontend/optimizer/plan_node/
stream_sort.rs1use std::collections::HashSet;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_common::catalog::FieldDisplay;
19use risingwave_common::util::sort_util::OrderType;
20use risingwave_pb::stream_plan::stream_node::PbNodeBody;
21
22use super::stream::prelude::*;
23use super::utils::{Distill, TableCatalogBuilder, childless_record};
24use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
25use crate::TableCatalog;
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::property::{Monotonicity, MonotonicityMap, WatermarkColumns};
28use crate::stream_fragmenter::BuildFragmentGraphState;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct StreamEowcSort {
32 pub base: PlanBase<Stream>,
33
34 input: PlanRef,
35 sort_column_index: usize,
36}
37
38impl Distill for StreamEowcSort {
39 fn distill<'a>(&self) -> XmlNode<'a> {
40 let fields = vec![(
41 "sort_column",
42 Pretty::display(&FieldDisplay(&self.input.schema()[self.sort_column_index])),
43 )];
44 childless_record("StreamEowcSort", fields)
45 }
46}
47
48impl StreamEowcSort {
49 pub fn new(input: PlanRef, sort_column_index: usize) -> Self {
50 assert!(input.watermark_columns().contains(sort_column_index));
51
52 let schema = input.schema().clone();
53 let stream_key = input.stream_key().map(|v| v.to_vec());
54 let fd_set = input.functional_dependency().clone();
55 let dist = input.distribution().clone();
56
57 let mut watermark_columns = WatermarkColumns::new();
58 watermark_columns.insert(
59 sort_column_index,
60 input
63 .watermark_columns()
64 .get_group(sort_column_index)
65 .unwrap(),
66 );
67
68 let mut columns_monotonicity = MonotonicityMap::new();
70 columns_monotonicity.insert(sort_column_index, Monotonicity::NonDecreasing);
71
72 let base = PlanBase::new_stream(
73 input.ctx(),
74 schema,
75 stream_key,
76 fd_set,
77 dist,
78 true,
79 true,
80 watermark_columns,
81 columns_monotonicity,
82 );
83 Self {
84 base,
85 input,
86 sort_column_index,
87 }
88 }
89
90 fn infer_state_table(&self) -> TableCatalog {
91 let in_fields = self.input.schema().fields();
94 let mut tbl_builder = TableCatalogBuilder::default();
95 for field in in_fields {
96 tbl_builder.add_column(field);
97 }
98
99 let mut order_cols = HashSet::new();
100 tbl_builder.add_order_column(self.sort_column_index, OrderType::ascending());
101 order_cols.insert(self.sort_column_index);
102
103 let dist_key = self.base.distribution().dist_column_indices().to_vec();
104 for idx in &dist_key {
105 if !order_cols.contains(idx) {
106 tbl_builder.add_order_column(*idx, OrderType::ascending());
107 order_cols.insert(*idx);
108 }
109 }
110
111 for idx in self.input.expect_stream_key() {
112 if !order_cols.contains(idx) {
113 tbl_builder.add_order_column(*idx, OrderType::ascending());
114 order_cols.insert(*idx);
115 }
116 }
117
118 let read_prefix_len_hint = 0;
119 tbl_builder.build(dist_key, read_prefix_len_hint)
120 }
121}
122
123impl PlanTreeNodeUnary for StreamEowcSort {
124 fn input(&self) -> PlanRef {
125 self.input.clone()
126 }
127
128 fn clone_with_input(&self, input: PlanRef) -> Self {
129 Self::new(input, self.sort_column_index)
130 }
131}
132
133impl_plan_tree_node_for_unary! { StreamEowcSort }
134
135impl StreamNode for StreamEowcSort {
136 fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
137 use risingwave_pb::stream_plan::*;
138 PbNodeBody::Sort(Box::new(SortNode {
139 state_table: Some(
140 self.infer_state_table()
141 .with_id(state.gen_table_id_wrapped())
142 .to_internal_table_prost(),
143 ),
144 sort_column_index: self.sort_column_index as _,
145 }))
146 }
147}
148
149impl ExprRewritable for StreamEowcSort {}
150
151impl ExprVisitable for StreamEowcSort {}