risingwave_frontend/optimizer/plan_node/
stream_eowc_over_window.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use risingwave_common::util::sort_util::OrderType;
18use risingwave_pb::stream_plan::stream_node::PbNodeBody;
19
20use super::generic::{self, PlanWindowFunction};
21use super::stream::prelude::*;
22use super::utils::{TableCatalogBuilder, impl_distill_by_unit};
23use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
24use crate::TableCatalog;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::property::{MonotonicityMap, WatermarkColumns};
27use crate::stream_fragmenter::BuildFragmentGraphState;
28
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct StreamEowcOverWindow {
31    pub base: PlanBase<Stream>,
32    core: generic::OverWindow<PlanRef>,
33}
34
35impl StreamEowcOverWindow {
36    pub fn new(core: generic::OverWindow<PlanRef>) -> Self {
37        assert!(core.funcs_have_same_partition_and_order());
38
39        let input = &core.input;
40        assert!(input.append_only());
41        assert!(input.emit_on_window_close());
42
43        // Should order by a single watermark column.
44        let order_key = &core.window_functions[0].order_by;
45        assert_eq!(order_key.len(), 1);
46        assert_eq!(order_key[0].order_type, OrderType::ascending());
47        let order_key_idx = order_key[0].column_index;
48        let input_watermark_cols = core.input.watermark_columns();
49        assert!(input_watermark_cols.contains(order_key_idx));
50
51        // `EowcOverWindowExecutor` cannot produce any watermark columns, because there may be some
52        // ancient rows in some rarely updated partitions that are emitted at the end of time.
53        let watermark_columns = WatermarkColumns::new();
54
55        let base = PlanBase::new_stream_with_core(
56            &core,
57            input.distribution().clone(),
58            true,
59            true,
60            watermark_columns,
61            // we cannot derive monotonicity for any column for the same reason as watermark columns
62            MonotonicityMap::new(),
63        );
64        StreamEowcOverWindow { base, core }
65    }
66
67    fn window_functions(&self) -> &[PlanWindowFunction] {
68        &self.core.window_functions
69    }
70
71    fn partition_key_indices(&self) -> Vec<usize> {
72        self.window_functions()[0]
73            .partition_by
74            .iter()
75            .map(|i| i.index())
76            .collect()
77    }
78
79    fn order_key_index(&self) -> usize {
80        self.window_functions()[0].order_by[0].column_index
81    }
82
83    fn infer_state_table(&self) -> TableCatalog {
84        // The EOWC over window state table has the same schema as the input.
85
86        let in_fields = self.core.input.schema().fields();
87        let mut tbl_builder = TableCatalogBuilder::default();
88        for field in in_fields {
89            tbl_builder.add_column(field);
90        }
91
92        let mut order_cols = HashSet::new();
93        let partition_key_indices = self.partition_key_indices();
94        for idx in &partition_key_indices {
95            if !order_cols.contains(idx) {
96                tbl_builder.add_order_column(*idx, OrderType::ascending());
97                order_cols.insert(*idx);
98            }
99        }
100        let read_prefix_len_hint = tbl_builder.get_current_pk_len();
101        let order_key_index = self.order_key_index();
102        if !order_cols.contains(&order_key_index) {
103            tbl_builder.add_order_column(order_key_index, OrderType::ascending());
104            order_cols.insert(order_key_index);
105        }
106        for idx in self.core.input.expect_stream_key() {
107            if !order_cols.contains(idx) {
108                tbl_builder.add_order_column(*idx, OrderType::ascending());
109                order_cols.insert(*idx);
110            }
111        }
112
113        let in_dist_key = self.core.input.distribution().dist_column_indices();
114        tbl_builder.build(in_dist_key.to_vec(), read_prefix_len_hint)
115    }
116}
117
118impl_distill_by_unit!(StreamEowcOverWindow, core, "StreamEowcOverWindow");
119
120impl PlanTreeNodeUnary for StreamEowcOverWindow {
121    fn input(&self) -> PlanRef {
122        self.core.input.clone()
123    }
124
125    fn clone_with_input(&self, input: PlanRef) -> Self {
126        let mut core = self.core.clone();
127        core.input = input;
128        Self::new(core)
129    }
130}
131impl_plan_tree_node_for_unary! { StreamEowcOverWindow }
132
133impl StreamNode for StreamEowcOverWindow {
134    fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody {
135        use risingwave_pb::stream_plan::*;
136
137        let calls = self
138            .window_functions()
139            .iter()
140            .map(PlanWindowFunction::to_protobuf)
141            .collect();
142        let partition_by = self.window_functions()[0]
143            .partition_by
144            .iter()
145            .map(|i| i.index() as _)
146            .collect();
147        let order_by = self.window_functions()[0]
148            .order_by
149            .iter()
150            .map(|o| o.to_protobuf())
151            .collect();
152        let state_table = self
153            .infer_state_table()
154            .with_id(state.gen_table_id_wrapped())
155            .to_internal_table_prost();
156
157        PbNodeBody::EowcOverWindow(Box::new(EowcOverWindowNode {
158            calls,
159            partition_by,
160            order_by,
161            state_table: Some(state_table),
162        }))
163    }
164}
165
166impl ExprRewritable for StreamEowcOverWindow {}
167
168impl ExprVisitable for StreamEowcOverWindow {}