risingwave_stream/from_proto/
hash_agg.rs1use std::sync::Arc;
18
19use risingwave_common::hash::{HashKey, HashKeyDispatcher};
20use risingwave_common::types::DataType;
21use risingwave_expr::aggregate::AggCall;
22use risingwave_pb::stream_plan::HashAggNode;
23
24use super::agg_common::{
25 build_agg_state_storages_from_proto, build_distinct_dedup_table_from_proto,
26};
27use super::*;
28use crate::common::table::state_table::StateTableBuilder;
29use crate::executor::aggregate::{AggExecutorArgs, HashAggExecutor, HashAggExecutorExtraArgs};
30
31pub struct HashAggExecutorDispatcherArgs<S: StateStore> {
32 args: AggExecutorArgs<S, HashAggExecutorExtraArgs>,
33 group_key_types: Vec<DataType>,
34}
35
36impl<S: StateStore> HashKeyDispatcher for HashAggExecutorDispatcherArgs<S> {
37 type Output = StreamResult<Box<dyn Execute>>;
38
39 fn dispatch_impl<K: HashKey>(self) -> Self::Output {
40 Ok(HashAggExecutor::<K, S>::new(self.args)?.boxed())
41 }
42
43 fn data_types(&self) -> &[DataType] {
44 &self.group_key_types
45 }
46}
47
48pub struct HashAggExecutorBuilder;
49
50impl_stream_node_body!(HashAgg(HashAggNode) => HashAggExecutorBuilder);
51
52impl ExecutorBuilder for HashAggExecutorBuilder {
53 type Node = HashAggNode;
54
55 async fn new_boxed_executor(
56 params: ExecutorParams,
57 node: &Self::Node,
58 store: impl StateStore,
59 ) -> StreamResult<Executor> {
60 let group_key_indices = node
61 .get_group_key()
62 .iter()
63 .map(|key| *key as usize)
64 .collect::<Vec<_>>();
65 let [input]: [_; 1] = params.input.try_into().unwrap();
66 let group_key_types = group_key_indices
67 .iter()
68 .map(|idx| input.schema().fields[*idx].data_type())
69 .collect_vec();
70
71 let agg_calls: Vec<AggCall> = node
72 .get_agg_calls()
73 .iter()
74 .map(AggCall::from_protobuf)
75 .try_collect()?;
76
77 let vnodes = Some(Arc::new(
78 params.vnode_bitmap.expect("vnodes not set for hash agg"),
79 ));
80 let storages = build_agg_state_storages_from_proto(
81 node.get_agg_call_states(),
82 store.clone(),
83 vnodes.clone(),
84 ¶ms.config,
85 )
86 .await;
87 let intermediate_state_table = StateTableBuilder::new(
89 node.get_intermediate_state_table().unwrap(),
90 store.clone(),
91 vnodes.clone(),
92 )
93 .enable_preload_all_rows_by_config(¶ms.config)
94 .build()
95 .await;
96 let distinct_dedup_tables = build_distinct_dedup_table_from_proto(
97 node.get_distinct_dedup_tables(),
98 store,
99 vnodes,
100 ¶ms.config,
101 )
102 .await;
103
104 let exec = HashAggExecutorDispatcherArgs {
105 args: AggExecutorArgs {
106 version: node.version(),
107
108 input,
109 actor_ctx: params.actor_context,
110 info: params.info.clone(),
111
112 extreme_cache_size: params.config.developer.unsafe_extreme_cache_size,
113
114 agg_calls,
115 row_count_index: node.get_row_count_index() as usize,
116 storages,
117 intermediate_state_table,
118 distinct_dedup_tables,
119 watermark_epoch: params.watermark_epoch,
120 extra: HashAggExecutorExtraArgs {
121 group_key_indices,
122 chunk_size: params.config.developer.chunk_size,
123 max_dirty_groups_heap_size: (params.config.developer)
124 .hash_agg_max_dirty_groups_heap_size,
125 emit_on_window_close: node.get_emit_on_window_close(),
126 },
127 },
128 group_key_types,
129 }
130 .dispatch()?;
131 Ok((params.info, exec).into())
132 }
133}