risingwave_frontend/optimizer/plan_node/
stream_over_window.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
18use risingwave_pb::stream_plan::stream_node::PbNodeBody;
19
20use super::generic::{GenericPlanNode, PlanWindowFunction};
21use super::stream::prelude::*;
22use super::utils::{TableCatalogBuilder, impl_distill_by_unit};
23use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode, generic};
24use crate::TableCatalog;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::property::{MonotonicityMap, WatermarkColumns};
27use crate::stream_fragmenter::BuildFragmentGraphState;
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct StreamOverWindow {
31    pub base: PlanBase<Stream>,
32    core: generic::OverWindow<PlanRef>,
33}
34
35impl StreamOverWindow {
36    pub fn new(core: generic::OverWindow<PlanRef>) -> Self {
37        assert!(core.funcs_have_same_partition_and_order());
38
39        let input = &core.input;
40        let watermark_columns = WatermarkColumns::new();
41
42        let base = PlanBase::new_stream_with_core(
43            &core,
44            input.distribution().clone(),
45            false, // general over window cannot be append-only
46            false,
47            watermark_columns,
48            MonotonicityMap::new(), // TODO: derive monotonicity
49        );
50        StreamOverWindow { base, core }
51    }
52
53    fn infer_state_table(&self) -> TableCatalog {
54        let mut tbl_builder = TableCatalogBuilder::default();
55
56        let out_schema = self.core.schema();
57        for field in out_schema.fields() {
58            tbl_builder.add_column(field);
59        }
60
61        let mut order_cols = HashSet::new();
62        for idx in self.core.partition_key_indices() {
63            if order_cols.insert(idx) {
64                tbl_builder.add_order_column(idx, OrderType::ascending());
65            }
66        }
67        let read_prefix_len_hint = tbl_builder.get_current_pk_len();
68        for o in self.core.order_key() {
69            if order_cols.insert(o.column_index) {
70                tbl_builder.add_order_column(o.column_index, o.order_type);
71            }
72        }
73        for &idx in self.core.input.expect_stream_key() {
74            if order_cols.insert(idx) {
75                tbl_builder.add_order_column(idx, OrderType::ascending());
76            }
77        }
78
79        let in_dist_key = self.core.input.distribution().dist_column_indices();
80        tbl_builder.build(in_dist_key.to_vec(), read_prefix_len_hint)
81    }
82}
83
84impl_distill_by_unit!(StreamOverWindow, core, "StreamOverWindow");
85
86impl PlanTreeNodeUnary for StreamOverWindow {
87    fn input(&self) -> PlanRef {
88        self.core.input.clone()
89    }
90
91    fn clone_with_input(&self, input: PlanRef) -> Self {
92        let mut core = self.core.clone();
93        core.input = input;
94        Self::new(core)
95    }
96}
97impl_plan_tree_node_for_unary! { StreamOverWindow }
98
99impl StreamNode for StreamOverWindow {
100    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
101        use risingwave_pb::stream_plan::*;
102
103        let calls = self
104            .core
105            .window_functions()
106            .iter()
107            .map(PlanWindowFunction::to_protobuf)
108            .collect();
109        let partition_by = self
110            .core
111            .partition_key_indices()
112            .into_iter()
113            .map(|idx| idx as _)
114            .collect();
115        let order_by = self
116            .core
117            .order_key()
118            .iter()
119            .map(ColumnOrder::to_protobuf)
120            .collect();
121        let state_table = self
122            .infer_state_table()
123            .with_id(state.gen_table_id_wrapped())
124            .to_internal_table_prost();
125        let cache_policy = self
126            .base
127            .ctx()
128            .session_ctx()
129            .config()
130            .streaming_over_window_cache_policy();
131
132        PbNodeBody::OverWindow(Box::new(OverWindowNode {
133            calls,
134            partition_by,
135            order_by,
136            state_table: Some(state_table),
137            cache_policy: cache_policy.to_protobuf() as _,
138        }))
139    }
140}
141
142impl ExprRewritable for StreamOverWindow {}
143
144impl ExprVisitable for StreamOverWindow {}