Skip to main content

risingwave_stream/from_proto/
group_top_n.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::catalog::Schema;
18use risingwave_common::hash::{HashKey, HashKeyDispatcher};
19use risingwave_common::types::DataType;
20use risingwave_common::util::sort_util::ColumnOrder;
21use risingwave_pb::stream_plan::GroupTopNNode;
22
23use super::*;
24use crate::common::table::state_table::{StateTable, StateTableBuilder};
25use crate::executor::{ActorContextRef, AppendOnlyGroupTopNExecutor, GroupTopNExecutor};
26use crate::task::AtomicU64Ref;
27
28pub struct GroupTopNExecutorBuilder<const APPEND_ONLY: bool>;
29
30impl_stream_node_body!(GroupTopN(GroupTopNNode) => GroupTopNExecutorBuilder<false>);
31impl_stream_node_body!(AppendOnlyGroupTopN(GroupTopNNode) => GroupTopNExecutorBuilder<true>);
32
33impl<const APPEND_ONLY: bool> ExecutorBuilder for GroupTopNExecutorBuilder<APPEND_ONLY> {
34    type Node = GroupTopNNode;
35
36    async fn new_boxed_executor(
37        params: ExecutorParams,
38        node: &Self::Node,
39        store: impl StateStore,
40    ) -> StreamResult<Executor> {
41        let group_by: Vec<usize> = node
42            .get_group_key()
43            .iter()
44            .map(|idx| *idx as usize)
45            .collect();
46        let table = node.get_table()?;
47        let vnodes = params.vnode_bitmap.map(Arc::new);
48        let state_table = StateTableBuilder::new(table, store, vnodes)
49            .enable_preload_all_rows_by_config(&params.config)
50            .build()
51            .await;
52        let storage_key = table
53            .get_pk()
54            .iter()
55            .map(ColumnOrder::from_protobuf)
56            .collect();
57        let [input]: [_; 1] = params.input.try_into().unwrap();
58        let group_key_types = group_by
59            .iter()
60            .map(|i| input.schema()[*i].data_type())
61            .collect();
62        let order_by = node
63            .order_by
64            .iter()
65            .map(ColumnOrder::from_protobuf)
66            .collect();
67
68        let args = GroupTopNExecutorDispatcherArgs {
69            input,
70            ctx: params.actor_context,
71            schema: params.info.schema.clone(),
72            storage_key,
73            offset_and_limit: (node.offset as usize, node.limit as usize),
74            order_by,
75            group_by,
76            state_table,
77            watermark_epoch: params.watermark_epoch,
78            group_key_types,
79
80            with_ties: node.with_ties,
81            append_only: APPEND_ONLY,
82        };
83        Ok((params.info, args.dispatch()?).into())
84    }
85}
86
87struct GroupTopNExecutorDispatcherArgs<S: StateStore> {
88    input: Executor,
89    ctx: ActorContextRef,
90    schema: Schema,
91    storage_key: Vec<ColumnOrder>,
92    offset_and_limit: (usize, usize),
93    order_by: Vec<ColumnOrder>,
94    group_by: Vec<usize>,
95    state_table: StateTable<S>,
96    watermark_epoch: AtomicU64Ref,
97    group_key_types: Vec<DataType>,
98
99    with_ties: bool,
100    append_only: bool,
101}
102
103impl<S: StateStore> HashKeyDispatcher for GroupTopNExecutorDispatcherArgs<S> {
104    type Output = StreamResult<Box<dyn Execute>>;
105
106    fn dispatch_impl<K: HashKey>(self) -> Self::Output {
107        macro_rules! build {
108            ($executor:ident, $with_ties:literal) => {
109                Ok($executor::<K, S, $with_ties>::new(
110                    self.input,
111                    self.ctx,
112                    self.schema,
113                    self.storage_key,
114                    self.offset_and_limit,
115                    self.order_by,
116                    self.group_by,
117                    self.state_table,
118                    self.watermark_epoch,
119                )?
120                .boxed())
121            };
122        }
123        match (self.append_only, self.with_ties) {
124            (true, true) => build!(AppendOnlyGroupTopNExecutor, true),
125            (true, false) => build!(AppendOnlyGroupTopNExecutor, false),
126            (false, true) => build!(GroupTopNExecutor, true),
127            (false, false) => build!(GroupTopNExecutor, false),
128        }
129    }
130
131    fn data_types(&self) -> &[DataType] {
132        &self.group_key_types
133    }
134}