risingwave_stream/from_proto/
hash_agg.rs
1use std::sync::Arc;
18
19use risingwave_common::hash::{HashKey, HashKeyDispatcher};
20use risingwave_common::types::DataType;
21use risingwave_expr::aggregate::AggCall;
22use risingwave_pb::stream_plan::HashAggNode;
23
24use super::agg_common::{
25 build_agg_state_storages_from_proto, build_distinct_dedup_table_from_proto,
26};
27use super::*;
28use crate::common::table::state_table::StateTable;
29use crate::executor::aggregate::{AggExecutorArgs, HashAggExecutor, HashAggExecutorExtraArgs};
30
31pub struct HashAggExecutorDispatcherArgs<S: StateStore> {
32 args: AggExecutorArgs<S, HashAggExecutorExtraArgs>,
33 group_key_types: Vec<DataType>,
34}
35
36impl<S: StateStore> HashKeyDispatcher for HashAggExecutorDispatcherArgs<S> {
37 type Output = StreamResult<Box<dyn Execute>>;
38
39 fn dispatch_impl<K: HashKey>(self) -> Self::Output {
40 Ok(HashAggExecutor::<K, S>::new(self.args)?.boxed())
41 }
42
43 fn data_types(&self) -> &[DataType] {
44 &self.group_key_types
45 }
46}
47
48pub struct HashAggExecutorBuilder;
49
50impl ExecutorBuilder for HashAggExecutorBuilder {
51 type Node = HashAggNode;
52
53 async fn new_boxed_executor(
54 params: ExecutorParams,
55 node: &Self::Node,
56 store: impl StateStore,
57 ) -> StreamResult<Executor> {
58 let group_key_indices = node
59 .get_group_key()
60 .iter()
61 .map(|key| *key as usize)
62 .collect::<Vec<_>>();
63 let [input]: [_; 1] = params.input.try_into().unwrap();
64 let group_key_types = group_key_indices
65 .iter()
66 .map(|idx| input.schema().fields[*idx].data_type())
67 .collect_vec();
68
69 let agg_calls: Vec<AggCall> = node
70 .get_agg_calls()
71 .iter()
72 .map(AggCall::from_protobuf)
73 .try_collect()?;
74
75 let vnodes = Some(Arc::new(
76 params.vnode_bitmap.expect("vnodes not set for hash agg"),
77 ));
78 let storages = build_agg_state_storages_from_proto(
79 node.get_agg_call_states(),
80 store.clone(),
81 vnodes.clone(),
82 )
83 .await;
84 let intermediate_state_table = StateTable::from_table_catalog(
86 node.get_intermediate_state_table().unwrap(),
87 store.clone(),
88 vnodes.clone(),
89 )
90 .await;
91 let distinct_dedup_tables =
92 build_distinct_dedup_table_from_proto(node.get_distinct_dedup_tables(), store, vnodes)
93 .await;
94
95 let exec = HashAggExecutorDispatcherArgs {
96 args: AggExecutorArgs {
97 version: node.version(),
98
99 input,
100 actor_ctx: params.actor_context,
101 info: params.info.clone(),
102
103 extreme_cache_size: params.env.config().developer.unsafe_extreme_cache_size,
104
105 agg_calls,
106 row_count_index: node.get_row_count_index() as usize,
107 storages,
108 intermediate_state_table,
109 distinct_dedup_tables,
110 watermark_epoch: params.watermark_epoch,
111 extra: HashAggExecutorExtraArgs {
112 group_key_indices,
113 chunk_size: params.env.config().developer.chunk_size,
114 max_dirty_groups_heap_size: params
115 .env
116 .config()
117 .developer
118 .hash_agg_max_dirty_groups_heap_size,
119 emit_on_window_close: node.get_emit_on_window_close(),
120 },
121 },
122 group_key_types,
123 }
124 .dispatch()?;
125 Ok((params.info, exec).into())
126 }
127}