risingwave_stream/from_proto/
group_top_n.rs1use std::sync::Arc;
16
17use risingwave_common::catalog::Schema;
18use risingwave_common::hash::{HashKey, HashKeyDispatcher};
19use risingwave_common::types::DataType;
20use risingwave_common::util::sort_util::ColumnOrder;
21use risingwave_pb::stream_plan::GroupTopNNode;
22
23use super::*;
24use crate::common::table::state_table::{StateTable, StateTableBuilder};
25use crate::executor::{ActorContextRef, AppendOnlyGroupTopNExecutor, GroupTopNExecutor};
26use crate::task::AtomicU64Ref;
27
28pub struct GroupTopNExecutorBuilder<const APPEND_ONLY: bool>;
29
30impl<const APPEND_ONLY: bool> ExecutorBuilder for GroupTopNExecutorBuilder<APPEND_ONLY> {
31 type Node = GroupTopNNode;
32
33 async fn new_boxed_executor(
34 params: ExecutorParams,
35 node: &Self::Node,
36 store: impl StateStore,
37 ) -> StreamResult<Executor> {
38 let group_by: Vec<usize> = node
39 .get_group_key()
40 .iter()
41 .map(|idx| *idx as usize)
42 .collect();
43 let table = node.get_table()?;
44 let vnodes = params.vnode_bitmap.map(Arc::new);
45 let state_table = StateTableBuilder::new(table, store, vnodes)
46 .enable_preload_all_rows_by_config(¶ms.actor_context.streaming_config)
47 .build()
48 .await;
49 let storage_key = table
50 .get_pk()
51 .iter()
52 .map(ColumnOrder::from_protobuf)
53 .collect();
54 let [input]: [_; 1] = params.input.try_into().unwrap();
55 let group_key_types = group_by
56 .iter()
57 .map(|i| input.schema()[*i].data_type())
58 .collect();
59 let order_by = node
60 .order_by
61 .iter()
62 .map(ColumnOrder::from_protobuf)
63 .collect();
64
65 let args = GroupTopNExecutorDispatcherArgs {
66 input,
67 ctx: params.actor_context,
68 schema: params.info.schema.clone(),
69 storage_key,
70 offset_and_limit: (node.offset as usize, node.limit as usize),
71 order_by,
72 group_by,
73 state_table,
74 watermark_epoch: params.watermark_epoch,
75 group_key_types,
76
77 with_ties: node.with_ties,
78 append_only: APPEND_ONLY,
79 };
80 Ok((params.info, args.dispatch()?).into())
81 }
82}
83
84struct GroupTopNExecutorDispatcherArgs<S: StateStore> {
85 input: Executor,
86 ctx: ActorContextRef,
87 schema: Schema,
88 storage_key: Vec<ColumnOrder>,
89 offset_and_limit: (usize, usize),
90 order_by: Vec<ColumnOrder>,
91 group_by: Vec<usize>,
92 state_table: StateTable<S>,
93 watermark_epoch: AtomicU64Ref,
94 group_key_types: Vec<DataType>,
95
96 with_ties: bool,
97 append_only: bool,
98}
99
100impl<S: StateStore> HashKeyDispatcher for GroupTopNExecutorDispatcherArgs<S> {
101 type Output = StreamResult<Box<dyn Execute>>;
102
103 fn dispatch_impl<K: HashKey>(self) -> Self::Output {
104 macro_rules! build {
105 ($excutor:ident, $with_ties:literal) => {
106 Ok($excutor::<K, S, $with_ties>::new(
107 self.input,
108 self.ctx,
109 self.schema,
110 self.storage_key,
111 self.offset_and_limit,
112 self.order_by,
113 self.group_by,
114 self.state_table,
115 self.watermark_epoch,
116 )?
117 .boxed())
118 };
119 }
120 match (self.append_only, self.with_ties) {
121 (true, true) => build!(AppendOnlyGroupTopNExecutor, true),
122 (true, false) => build!(AppendOnlyGroupTopNExecutor, false),
123 (false, true) => build!(GroupTopNExecutor, true),
124 (false, false) => build!(GroupTopNExecutor, false),
125 }
126 }
127
128 fn data_types(&self) -> &[DataType] {
129 &self.group_key_types
130 }
131}