risingwave_frontend/planner/
query.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use fixedbitset::FixedBitSet;
16
17use crate::binder::BoundQuery;
18use crate::error::Result;
19use crate::optimizer::PlanRoot;
20use crate::optimizer::plan_node::{LogicalLimit, LogicalTopN};
21use crate::optimizer::property::{Order, RequiredDist};
22use crate::planner::Planner;
23
24pub const LIMIT_ALL_COUNT: u64 = u64::MAX / 2;
25
26impl Planner {
27    /// Plan a [`BoundQuery`]. Need to bind before planning.
28    ///
29    /// Works for both batch query and streaming query (`CREATE MATERIALIZED VIEW`).
30    pub fn plan_query(&mut self, query: BoundQuery) -> Result<PlanRoot> {
31        let out_names = query.schema().names();
32        let BoundQuery {
33            body,
34            order,
35            limit,
36            offset,
37            with_ties,
38            extra_order_exprs,
39        } = query;
40
41        let extra_order_exprs_len = extra_order_exprs.len();
42        let mut plan = self.plan_set_expr(body, extra_order_exprs, &order)?;
43        let mut order = Order {
44            column_orders: order,
45        };
46
47        if limit.is_some() || offset.is_some() {
48            // Optimize order key if using it for TopN / Limit.
49            // Both are singleton dist, so we can leave dist_key_indices as empty.
50            let func_dep = plan.functional_dependency();
51            order = func_dep.minimize_order_key(order, &[]);
52
53            let limit = limit.unwrap_or(LIMIT_ALL_COUNT);
54
55            let offset = offset.unwrap_or_default();
56            plan = if order.column_orders.is_empty() {
57                // Should be rejected by parser.
58                assert!(!with_ties);
59                // Create a logical limit if with limit/offset but without order-by
60                LogicalLimit::create(plan, limit, offset)
61            } else {
62                // Create a logical top-n if with limit/offset and order-by
63                LogicalTopN::create(plan, limit, offset, order.clone(), with_ties, vec![])?
64            }
65        }
66        let mut out_fields = FixedBitSet::with_capacity(plan.schema().len());
67        out_fields.insert_range(..plan.schema().len() - extra_order_exprs_len);
68        if let Some(field) = plan.schema().fields.first()
69            && field.name == "projected_row_id"
70        {
71            // Do not output projected_row_id hidden column.
72            out_fields.set(0, false);
73        }
74        let root =
75            PlanRoot::new_with_logical_plan(plan, RequiredDist::Any, order, out_fields, out_names);
76        Ok(root)
77    }
78}