risingwave_stream/executor/aggregate/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_common::array::ArrayImpl::Bool;
18use risingwave_common::array::DataChunk;
19use risingwave_common::bail;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_expr::aggregate::{AggCall, AggType, PbAggKind};
22use risingwave_expr::expr::{LogReport, NonStrictExpression};
23use risingwave_pb::stream_plan::PbAggNodeVersion;
24
25use crate::executor::prelude::*;
26
27mod agg_group;
28mod agg_state;
29mod agg_state_cache;
30mod distinct;
31mod hash_agg;
32mod minput;
33mod simple_agg;
34mod stateless_simple_agg;
35
36pub use agg_state::AggStateStorage;
37pub use hash_agg::HashAggExecutor;
38pub use simple_agg::SimpleAggExecutor;
39pub use stateless_simple_agg::StatelessSimpleAggExecutor;
40
41/// Arguments needed to construct an `XxxAggExecutor`.
42pub struct AggExecutorArgs<S: StateStore, E: AggExecutorExtraArgs> {
43    pub version: PbAggNodeVersion,
44
45    // basic
46    pub input: Executor,
47    pub actor_ctx: ActorContextRef,
48    pub info: ExecutorInfo,
49
50    // system configs
51    pub extreme_cache_size: usize,
52
53    // agg common things
54    pub agg_calls: Vec<AggCall>,
55    pub row_count_index: usize,
56    pub storages: Vec<AggStateStorage<S>>,
57    pub intermediate_state_table: StateTable<S>,
58    pub distinct_dedup_tables: HashMap<usize, StateTable<S>>,
59    pub watermark_epoch: AtomicU64Ref,
60
61    // extra
62    pub extra: E,
63}
64
65pub trait AggExecutorExtraArgs {}
66
67pub struct SimpleAggExecutorExtraArgs {
68    pub must_output_per_barrier: bool,
69}
70impl AggExecutorExtraArgs for SimpleAggExecutorExtraArgs {}
71
72/// Extra arguments needed to construct an `HashAggExecutor`.
73pub struct HashAggExecutorExtraArgs {
74    pub group_key_indices: Vec<usize>,
75    pub chunk_size: usize,
76    pub max_dirty_groups_heap_size: usize,
77    pub emit_on_window_close: bool,
78}
79impl AggExecutorExtraArgs for HashAggExecutorExtraArgs {}
80
81async fn agg_call_filter_res(
82    agg_call: &AggCall,
83    chunk: &DataChunk,
84) -> StreamExecutorResult<Bitmap> {
85    let mut vis = chunk.visibility().clone();
86    if matches!(
87        agg_call.agg_type,
88        AggType::Builtin(PbAggKind::Min | PbAggKind::Max | PbAggKind::StringAgg)
89    ) {
90        // should skip NULL value for these kinds of agg function
91        let agg_col_idx = agg_call.args.val_indices()[0]; // the first arg is the agg column for all these kinds
92        let agg_col_bitmap = chunk.column_at(agg_col_idx).null_bitmap();
93        vis &= agg_col_bitmap;
94    }
95
96    if let Some(ref filter) = agg_call.filter {
97        // TODO: should we build `filter` in non-strict mode?
98        if let Bool(filter_res) = NonStrictExpression::new_topmost(&**filter, LogReport)
99            .eval_infallible(chunk)
100            .await
101            .as_ref()
102        {
103            vis &= filter_res.to_bitmap();
104        } else {
105            bail!("Filter can only receive bool array");
106        }
107    }
108
109    Ok(vis)
110}
111
112fn iter_table_storage<S>(
113    state_storages: &mut [AggStateStorage<S>],
114) -> impl Iterator<Item = &mut StateTable<S>>
115where
116    S: StateStore,
117{
118    state_storages
119        .iter_mut()
120        .filter_map(|storage| match storage {
121            AggStateStorage::Value => None,
122            AggStateStorage::MaterializedInput { table, .. } => Some(table),
123        })
124}