risingwave_frontend/optimizer/plan_node/
batch_over_window.rs1use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
16use risingwave_pb::batch_plan::SortOverWindowNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use super::batch::prelude::*;
20use super::generic::PlanWindowFunction;
21use super::utils::impl_distill_by_unit;
22use super::{
23 BatchPlanRef as PlanRef, ExprRewritable, PlanBase, PlanTreeNodeUnary, ToBatchPb,
24 ToDistributedBatch, ToLocalBatch, generic,
25};
26use crate::error::Result;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::property::{Order, RequiredDist};
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct BatchOverWindow {
32 pub base: PlanBase<Batch>,
33 core: generic::OverWindow<PlanRef>,
34}
35
36impl BatchOverWindow {
37 pub fn new(core: generic::OverWindow<PlanRef>) -> Self {
38 assert!(core.funcs_have_same_partition_and_order());
39
40 let input = &core.input;
41 let input_dist = input.distribution().clone();
42
43 let order = Order::new(
44 core.partition_key_indices()
45 .into_iter()
46 .map(|idx| ColumnOrder::new(idx, OrderType::default()))
47 .chain(core.order_key().iter().cloned())
48 .collect(),
49 );
50
51 let base = PlanBase::new_batch_with_core(&core, input_dist, order);
52 BatchOverWindow { base, core }
53 }
54
55 fn expected_input_order(&self) -> Order {
56 self.order().clone()
57 }
58}
59
60impl_distill_by_unit!(BatchOverWindow, core, "BatchOverWindow");
61
62impl PlanTreeNodeUnary<Batch> for BatchOverWindow {
63 fn input(&self) -> PlanRef {
64 self.core.input.clone()
65 }
66
67 fn clone_with_input(&self, input: PlanRef) -> Self {
68 let mut core = self.core.clone();
69 core.input = input;
70 Self::new(core)
71 }
72}
73
74impl_plan_tree_node_for_unary! { Batch, BatchOverWindow }
75
76impl ToDistributedBatch for BatchOverWindow {
77 fn to_distributed(&self) -> Result<PlanRef> {
78 let partition_key_indices = self.core.partition_key_indices();
79 let required_dist = if partition_key_indices.is_empty() {
80 RequiredDist::single()
81 } else {
82 RequiredDist::shard_by_key(self.input().schema().len(), &partition_key_indices)
83 };
84 let new_input = self
85 .input()
86 .to_distributed_with_required(&self.expected_input_order(), &required_dist)?;
87 Ok(self.clone_with_input(new_input).into())
88 }
89}
90
91impl ToLocalBatch for BatchOverWindow {
92 fn to_local(&self) -> Result<PlanRef> {
93 let new_input = self.input().to_local()?;
94 let new_input = RequiredDist::single()
95 .batch_enforce_if_not_satisfies(new_input, &self.expected_input_order())?;
96 Ok(self.clone_with_input(new_input).into())
97 }
98}
99
100impl ToBatchPb for BatchOverWindow {
101 fn to_batch_prost_body(&self) -> NodeBody {
102 let calls = self
103 .core
104 .window_functions()
105 .iter()
106 .map(PlanWindowFunction::to_protobuf)
107 .collect();
108 let partition_by = self
109 .core
110 .partition_key_indices()
111 .into_iter()
112 .map(|idx| idx as _)
113 .collect();
114 let order_by = self
115 .core
116 .order_key()
117 .iter()
118 .copied()
119 .map(ColumnOrder::to_protobuf)
120 .collect();
121
122 NodeBody::SortOverWindow(SortOverWindowNode {
123 calls,
124 partition_by,
125 order_by,
126 })
127 }
128}
129
130impl ExprRewritable<Batch> for BatchOverWindow {}
131
132impl ExprVisitable for BatchOverWindow {}