risingwave_frontend/optimizer/plan_node/
stream_sort.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use pretty_xmlish::{Pretty, XmlNode};
18use risingwave_common::catalog::FieldDisplay;
19use risingwave_common::util::sort_util::OrderType;
20use risingwave_pb::stream_plan::stream_node::PbNodeBody;
21
22use super::stream::prelude::*;
23use super::utils::{Distill, TableCatalogBuilder, childless_record};
24use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
25use crate::TableCatalog;
26use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
27use crate::optimizer::property::{Monotonicity, MonotonicityMap, WatermarkColumns};
28use crate::stream_fragmenter::BuildFragmentGraphState;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct StreamEowcSort {
32    pub base: PlanBase<Stream>,
33
34    input: PlanRef,
35    sort_column_index: usize,
36}
37
38impl Distill for StreamEowcSort {
39    fn distill<'a>(&self) -> XmlNode<'a> {
40        let fields = vec![(
41            "sort_column",
42            Pretty::display(&FieldDisplay(&self.input.schema()[self.sort_column_index])),
43        )];
44        childless_record("StreamEowcSort", fields)
45    }
46}
47
48impl StreamEowcSort {
49    pub fn new(input: PlanRef, sort_column_index: usize) -> Self {
50        assert!(input.watermark_columns().contains(sort_column_index));
51
52        let schema = input.schema().clone();
53        let stream_key = input.stream_key().map(|v| v.to_vec());
54        let fd_set = input.functional_dependency().clone();
55        let dist = input.distribution().clone();
56
57        let mut watermark_columns = WatermarkColumns::new();
58        watermark_columns.insert(
59            sort_column_index,
60            // `StreamSort` operator will propagate input watermark as it is,
61            // so we can assign the same watermark group.
62            input
63                .watermark_columns()
64                .get_group(sort_column_index)
65                .unwrap(),
66        );
67
68        // StreamEowcSort makes the sorting watermark column non-decreasing
69        let mut columns_monotonicity = MonotonicityMap::new();
70        columns_monotonicity.insert(sort_column_index, Monotonicity::NonDecreasing);
71
72        let base = PlanBase::new_stream(
73            input.ctx(),
74            schema,
75            stream_key,
76            fd_set,
77            dist,
78            true,
79            true,
80            watermark_columns,
81            columns_monotonicity,
82        );
83        Self {
84            base,
85            input,
86            sort_column_index,
87        }
88    }
89
90    fn infer_state_table(&self) -> TableCatalog {
91        // The sort state table has the same schema as the input.
92
93        let in_fields = self.input.schema().fields();
94        let mut tbl_builder = TableCatalogBuilder::default();
95        for field in in_fields {
96            tbl_builder.add_column(field);
97        }
98
99        let mut order_cols = HashSet::new();
100        tbl_builder.add_order_column(self.sort_column_index, OrderType::ascending());
101        order_cols.insert(self.sort_column_index);
102
103        let dist_key = self.base.distribution().dist_column_indices().to_vec();
104        for idx in &dist_key {
105            if !order_cols.contains(idx) {
106                tbl_builder.add_order_column(*idx, OrderType::ascending());
107                order_cols.insert(*idx);
108            }
109        }
110
111        for idx in self.input.expect_stream_key() {
112            if !order_cols.contains(idx) {
113                tbl_builder.add_order_column(*idx, OrderType::ascending());
114                order_cols.insert(*idx);
115            }
116        }
117
118        let read_prefix_len_hint = 0;
119        tbl_builder.build(dist_key, read_prefix_len_hint)
120    }
121}
122
123impl PlanTreeNodeUnary for StreamEowcSort {
124    fn input(&self) -> PlanRef {
125        self.input.clone()
126    }
127
128    fn clone_with_input(&self, input: PlanRef) -> Self {
129        Self::new(input, self.sort_column_index)
130    }
131}
132
133impl_plan_tree_node_for_unary! { StreamEowcSort }
134
135impl StreamNode for StreamEowcSort {
136    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
137        use risingwave_pb::stream_plan::*;
138        PbNodeBody::Sort(Box::new(SortNode {
139            state_table: Some(
140                self.infer_state_table()
141                    .with_id(state.gen_table_id_wrapped())
142                    .to_internal_table_prost(),
143            ),
144            sort_column_index: self.sort_column_index as _,
145        }))
146    }
147}
148
149impl ExprRewritable for StreamEowcSort {}
150
151impl ExprVisitable for StreamEowcSort {}