risingwave_frontend/planner/
query.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use fixedbitset::FixedBitSet;

use crate::binder::BoundQuery;
use crate::error::Result;
use crate::optimizer::plan_node::{LogicalLimit, LogicalTopN};
use crate::optimizer::property::{Order, RequiredDist};
use crate::optimizer::PlanRoot;
use crate::planner::Planner;

pub const LIMIT_ALL_COUNT: u64 = u64::MAX / 2;

impl Planner {
    /// Plan a [`BoundQuery`]. Need to bind before planning.
    ///
    /// Works for both batch query and streaming query (`CREATE MATERIALIZED VIEW`).
    pub fn plan_query(&mut self, query: BoundQuery) -> Result<PlanRoot> {
        let out_names = query.schema().names();
        let BoundQuery {
            body,
            order,
            limit,
            offset,
            with_ties,
            extra_order_exprs,
        } = query;

        let extra_order_exprs_len = extra_order_exprs.len();
        let mut plan = self.plan_set_expr(body, extra_order_exprs, &order)?;
        let mut order = Order {
            column_orders: order,
        };

        if limit.is_some() || offset.is_some() {
            // Optimize order key if using it for TopN / Limit.
            // Both are singleton dist, so we can leave dist_key_indices as empty.
            let func_dep = plan.functional_dependency();
            order = func_dep.minimize_order_key(order, &[]);

            let limit = limit.unwrap_or(LIMIT_ALL_COUNT);
            let offset = offset.unwrap_or_default();
            plan = if order.column_orders.is_empty() {
                // Should be rejected by parser.
                assert!(!with_ties);
                // Create a logical limit if with limit/offset but without order-by
                LogicalLimit::create(plan, limit, offset)
            } else {
                // Create a logical top-n if with limit/offset and order-by
                LogicalTopN::create(plan, limit, offset, order.clone(), with_ties, vec![])?
            }
        }
        let mut out_fields = FixedBitSet::with_capacity(plan.schema().len());
        out_fields.insert_range(..plan.schema().len() - extra_order_exprs_len);
        if let Some(field) = plan.schema().fields.first()
            && field.name == "projected_row_id"
        {
            // Do not output projected_row_id hidden column.
            out_fields.set(0, false);
        }
        let root =
            PlanRoot::new_with_logical_plan(plan, RequiredDist::Any, order, out_fields, out_names);
        Ok(root)
    }
}