risingwave_stream/executor/aggregate/
mod.rs1use std::collections::HashMap;
16
17use risingwave_common::array::ArrayImpl::Bool;
18use risingwave_common::array::DataChunk;
19use risingwave_common::bail;
20use risingwave_common::bitmap::Bitmap;
21use risingwave_expr::aggregate::{AggCall, AggType, PbAggKind};
22use risingwave_expr::expr::{LogReport, NonStrictExpression};
23use risingwave_pb::stream_plan::PbAggNodeVersion;
24
25use crate::executor::prelude::*;
26
27mod agg_group;
28mod agg_state;
29mod agg_state_cache;
30mod distinct;
31mod hash_agg;
32mod minput;
33mod simple_agg;
34mod stateless_simple_agg;
35
36pub use agg_state::AggStateStorage;
37pub use hash_agg::HashAggExecutor;
38pub use simple_agg::SimpleAggExecutor;
39pub use stateless_simple_agg::StatelessSimpleAggExecutor;
40
41pub struct AggExecutorArgs<S: StateStore, E: AggExecutorExtraArgs> {
43 pub version: PbAggNodeVersion,
44
45 pub input: Executor,
47 pub actor_ctx: ActorContextRef,
48 pub info: ExecutorInfo,
49
50 pub extreme_cache_size: usize,
52
53 pub agg_calls: Vec<AggCall>,
55 pub row_count_index: usize,
56 pub storages: Vec<AggStateStorage<S>>,
57 pub intermediate_state_table: StateTable<S>,
58 pub distinct_dedup_tables: HashMap<usize, StateTable<S>>,
59 pub watermark_epoch: AtomicU64Ref,
60
61 pub extra: E,
63}
64
65pub trait AggExecutorExtraArgs {}
66
67pub struct SimpleAggExecutorExtraArgs {
68 pub must_output_per_barrier: bool,
69}
70impl AggExecutorExtraArgs for SimpleAggExecutorExtraArgs {}
71
72pub struct HashAggExecutorExtraArgs {
74 pub group_key_indices: Vec<usize>,
75 pub chunk_size: usize,
76 pub max_dirty_groups_heap_size: usize,
77 pub emit_on_window_close: bool,
78}
79impl AggExecutorExtraArgs for HashAggExecutorExtraArgs {}
80
81async fn agg_call_filter_res(
82 agg_call: &AggCall,
83 chunk: &DataChunk,
84) -> StreamExecutorResult<Bitmap> {
85 let mut vis = chunk.visibility().clone();
86 if matches!(
87 agg_call.agg_type,
88 AggType::Builtin(PbAggKind::Min | PbAggKind::Max | PbAggKind::StringAgg)
89 ) {
90 let agg_col_idx = agg_call.args.val_indices()[0]; let agg_col_bitmap = chunk.column_at(agg_col_idx).null_bitmap();
93 vis &= agg_col_bitmap;
94 }
95
96 if let Some(ref filter) = agg_call.filter {
97 if let Bool(filter_res) = NonStrictExpression::new_topmost(&**filter, LogReport)
99 .eval_infallible(chunk)
100 .await
101 .as_ref()
102 {
103 vis &= filter_res.to_bitmap();
104 } else {
105 bail!("Filter can only receive bool array");
106 }
107 }
108
109 Ok(vis)
110}
111
112fn iter_table_storage<S>(
113 state_storages: &mut [AggStateStorage<S>],
114) -> impl Iterator<Item = &mut StateTable<S>>
115where
116 S: StateStore,
117{
118 state_storages
119 .iter_mut()
120 .filter_map(|storage| match storage {
121 AggStateStorage::Value => None,
122 AggStateStorage::MaterializedInput { table, .. } => Some(table),
123 })
124}