risingwave_batch_executors/executor/aggregation/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module contains wrappers over `Aggregator` trait that implement `order by`, `filter`,
16//! `distinct` and project.
17
18mod distinct;
19mod filter;
20mod orderby;
21mod projection;
22
23use risingwave_expr::Result;
24use risingwave_expr::aggregate::{AggCall, BoxedAggregateFunction, build_append_only};
25
26use self::distinct::Distinct;
27use self::filter::*;
28use self::orderby::ProjectionOrderBy;
29use self::projection::Projection;
30
31/// Build an `BoxedAggregateFunction` from `AggCall`.
32pub fn build(agg: &AggCall) -> Result<BoxedAggregateFunction> {
33    let mut aggregator = build_append_only(agg)?;
34
35    if agg.distinct {
36        aggregator = Box::new(Distinct::new(aggregator));
37    }
38    if agg.column_orders.is_empty() {
39        aggregator = Box::new(Projection::new(agg.args.val_indices().to_vec(), aggregator));
40    } else {
41        aggregator = Box::new(ProjectionOrderBy::new(
42            agg.args.arg_types().to_vec(),
43            agg.args.val_indices().to_vec(),
44            agg.column_orders.clone(),
45            aggregator,
46        ));
47    }
48    if let Some(expr) = &agg.filter {
49        aggregator = Box::new(Filter::new(expr.clone(), aggregator));
50    }
51
52    Ok(aggregator)
53}