risingwave_frontend/optimizer/plan_node/
batch_topn.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::TopNNode;

use super::batch::prelude::*;
use super::generic::TopNLimit;
use super::utils::impl_distill_by_unit;
use super::{
    generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
};
use crate::error::Result;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::{BatchLimit, ToLocalBatch};
use crate::optimizer::property::{Order, RequiredDist};

/// `BatchTopN` implements [`super::LogicalTopN`] to find the top N elements with a heap
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchTopN {
    pub base: PlanBase<Batch>,
    core: generic::TopN<PlanRef>,
}

impl BatchTopN {
    pub fn new(core: generic::TopN<PlanRef>) -> Self {
        assert!(core.group_key.is_empty());
        let base = PlanBase::new_batch_with_core(
            &core,
            core.input.distribution().clone(),
            // BatchTopN outputs data in the order of specified order
            core.order.clone(),
        );
        BatchTopN { base, core }
    }

    fn two_phase_topn(&self, input: PlanRef) -> Result<PlanRef> {
        let new_limit = TopNLimit::new(
            self.core.limit_attr.limit() + self.core.offset,
            self.core.limit_attr.with_ties(),
        );
        let new_offset = 0;
        let partial_input: PlanRef = if input.order().satisfies(&self.core.order)
            && !self.core.limit_attr.with_ties()
        {
            let logical_partial_limit = generic::Limit::new(input, new_limit.limit(), new_offset);
            let batch_partial_limit = BatchLimit::new(logical_partial_limit);
            batch_partial_limit.into()
        } else {
            let logical_partial_topn =
                generic::TopN::without_group(input, new_limit, new_offset, self.core.order.clone());
            let batch_partial_topn = Self::new(logical_partial_topn);
            batch_partial_topn.into()
        };

        let ensure_single_dist =
            RequiredDist::single().enforce_if_not_satisfies(partial_input, &Order::any())?;

        let batch_global_topn = self.clone_with_input(ensure_single_dist);
        Ok(batch_global_topn.into())
    }

    fn one_phase_topn(&self, input: PlanRef) -> Result<PlanRef> {
        if input.order().satisfies(&self.core.order) && !self.core.limit_attr.with_ties() {
            let logical_limit =
                generic::Limit::new(input, self.core.limit_attr.limit(), self.core.offset);
            let batch_limit = BatchLimit::new(logical_limit);
            Ok(batch_limit.into())
        } else {
            Ok(self.clone_with_input(input).into())
        }
    }
}

impl_distill_by_unit!(BatchTopN, core, "BatchTopN");

impl PlanTreeNodeUnary for BatchTopN {
    fn input(&self) -> PlanRef {
        self.core.input.clone()
    }

    fn clone_with_input(&self, input: PlanRef) -> Self {
        let mut core = self.core.clone();
        core.input = input;
        Self::new(core)
    }
}

impl_plan_tree_node_for_unary! {BatchTopN}

impl ToDistributedBatch for BatchTopN {
    fn to_distributed(&self) -> Result<PlanRef> {
        let input = self.input().to_distributed()?;
        let single_dist = RequiredDist::single();
        if input.distribution().satisfies(&single_dist) {
            self.one_phase_topn(input)
        } else {
            self.two_phase_topn(input)
        }
    }
}

impl ToBatchPb for BatchTopN {
    fn to_batch_prost_body(&self) -> NodeBody {
        let column_orders = self.core.order.to_protobuf();
        NodeBody::TopN(TopNNode {
            limit: self.core.limit_attr.limit(),
            offset: self.core.offset,
            column_orders,
            with_ties: self.core.limit_attr.with_ties(),
        })
    }
}

impl ToLocalBatch for BatchTopN {
    fn to_local(&self) -> Result<PlanRef> {
        let input = self.input().to_local()?;
        let single_dist = RequiredDist::single();
        if input.distribution().satisfies(&single_dist) {
            self.one_phase_topn(input)
        } else {
            self.two_phase_topn(input)
        }
    }
}

impl ExprRewritable for BatchTopN {}

impl ExprVisitable for BatchTopN {}