risingwave_frontend/optimizer/plan_node/
batch_over_window.rs1use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
16use risingwave_pb::batch_plan::SortOverWindowNode;
17use risingwave_pb::batch_plan::plan_node::NodeBody;
18
19use super::batch::prelude::*;
20use super::generic::PlanWindowFunction;
21use super::utils::impl_distill_by_unit;
22use super::{
23 ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
24 ToLocalBatch, generic,
25};
26use crate::error::Result;
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::property::{Order, RequiredDist};
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31pub struct BatchOverWindow {
32 pub base: PlanBase<Batch>,
33 core: generic::OverWindow<PlanRef>,
34}
35
36impl BatchOverWindow {
37 pub fn new(core: generic::OverWindow<PlanRef>) -> Self {
38 assert!(core.funcs_have_same_partition_and_order());
39
40 let input = &core.input;
41 let input_dist = input.distribution().clone();
42
43 let order = Order::new(
44 core.partition_key_indices()
45 .into_iter()
46 .map(|idx| ColumnOrder::new(idx, OrderType::default()))
47 .chain(core.order_key().iter().cloned())
48 .collect(),
49 );
50
51 let base = PlanBase::new_batch_with_core(&core, input_dist, order);
52 BatchOverWindow { base, core }
53 }
54
55 fn expected_input_order(&self) -> Order {
56 self.order().clone()
57 }
58}
59
60impl_distill_by_unit!(BatchOverWindow, core, "BatchOverWindow");
61
62impl PlanTreeNodeUnary for BatchOverWindow {
63 fn input(&self) -> PlanRef {
64 self.core.input.clone()
65 }
66
67 fn clone_with_input(&self, input: PlanRef) -> Self {
68 let mut core = self.core.clone();
69 core.input = input;
70 Self::new(core)
71 }
72}
73
74impl_plan_tree_node_for_unary! { BatchOverWindow }
75
76impl ToDistributedBatch for BatchOverWindow {
77 fn to_distributed(&self) -> Result<PlanRef> {
78 let new_input = self.input().to_distributed_with_required(
79 &self.expected_input_order(),
80 &RequiredDist::shard_by_key(
81 self.input().schema().len(),
82 &self.core.partition_key_indices(),
83 ),
84 )?;
85 Ok(self.clone_with_input(new_input).into())
86 }
87}
88
89impl ToLocalBatch for BatchOverWindow {
90 fn to_local(&self) -> Result<PlanRef> {
91 let new_input = self.input().to_local()?;
92 let new_input = RequiredDist::single()
93 .enforce_if_not_satisfies(new_input, &self.expected_input_order())?;
94 Ok(self.clone_with_input(new_input).into())
95 }
96}
97
98impl ToBatchPb for BatchOverWindow {
99 fn to_batch_prost_body(&self) -> NodeBody {
100 let calls = self
101 .core
102 .window_functions()
103 .iter()
104 .map(PlanWindowFunction::to_protobuf)
105 .collect();
106 let partition_by = self
107 .core
108 .partition_key_indices()
109 .into_iter()
110 .map(|idx| idx as _)
111 .collect();
112 let order_by = self
113 .core
114 .order_key()
115 .iter()
116 .map(ColumnOrder::to_protobuf)
117 .collect();
118
119 NodeBody::SortOverWindow(SortOverWindowNode {
120 calls,
121 partition_by,
122 order_by,
123 })
124 }
125}
126
127impl ExprRewritable for BatchOverWindow {}
128
129impl ExprVisitable for BatchOverWindow {}