risingwave_stream/executor/top_n/
utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16
17use risingwave_common::util::epoch::EpochPair;
18use risingwave_common::util::row_serde::OrderedRowSerde;
19use risingwave_common::util::sort_util::ColumnOrder;
20
21use super::top_n_cache::CacheKey;
22use crate::executor::prelude::*;
23
24pub trait TopNExecutorBase: Send + 'static {
25    type State: StateStore;
26    /// Apply the chunk to the dirty state and get the diffs.
27    /// TODO(rc): There can be a 2 times amplification in terms of the chunk size, so we may need to
28    /// allow `apply_chunk` return a stream of chunks. Motivation is not quite strong though.
29    fn apply_chunk(
30        &mut self,
31        chunk: StreamChunk,
32    ) -> impl Future<Output = StreamExecutorResult<Option<StreamChunk>>> + Send;
33
34    /// Flush the buffered chunk to the storage backend.
35    fn flush_data(
36        &mut self,
37        epoch: EpochPair,
38    ) -> impl Future<Output = StreamExecutorResult<StateTablePostCommit<'_, Self::State>>> + Send;
39
40    /// Flush the buffered chunk to the storage backend.
41    fn try_flush_data(&mut self) -> impl Future<Output = StreamExecutorResult<()>> + Send;
42
43    fn clear_cache(&mut self) {
44        unreachable!()
45    }
46
47    fn evict(&mut self) {}
48
49    fn init(&mut self, epoch: EpochPair) -> impl Future<Output = StreamExecutorResult<()>> + Send;
50
51    /// Handle incoming watermarks
52    fn handle_watermark(
53        &mut self,
54        watermark: Watermark,
55    ) -> impl Future<Output = Option<Watermark>> + Send;
56}
57
58/// The struct wraps a [`TopNExecutorBase`]
59pub struct TopNExecutorWrapper<E> {
60    pub(super) input: Executor,
61    pub(super) ctx: ActorContextRef,
62    pub(super) inner: E,
63}
64
65impl<E> Execute for TopNExecutorWrapper<E>
66where
67    E: TopNExecutorBase,
68{
69    fn execute(self: Box<Self>) -> BoxedMessageStream {
70        self.top_n_executor_execute().boxed()
71    }
72}
73
74impl<E> TopNExecutorWrapper<E>
75where
76    E: TopNExecutorBase,
77{
78    /// We remark that topN executor diffs from aggregate executor as it must output diffs
79    /// whenever it applies a batch of input data. Therefore, topN executor flushes data only
80    /// instead of computing diffs and flushing when receiving a barrier.
81    #[try_stream(ok = Message, error = StreamExecutorError)]
82    pub(crate) async fn top_n_executor_execute(mut self: Box<Self>) {
83        let mut input = self.input.execute();
84
85        let barrier = expect_first_barrier(&mut input).await?;
86        let barrier_epoch = barrier.epoch;
87        yield Message::Barrier(barrier);
88        self.inner.init(barrier_epoch).await?;
89
90        #[for_await]
91        for msg in input {
92            self.inner.evict();
93            let msg = msg?;
94            match msg {
95                Message::Watermark(watermark) => {
96                    if let Some(output_watermark) = self.inner.handle_watermark(watermark).await {
97                        yield Message::Watermark(output_watermark);
98                    }
99                }
100                Message::Chunk(chunk) => {
101                    if let Some(output_chunk) = self.inner.apply_chunk(chunk).await? {
102                        yield Message::Chunk(output_chunk);
103                    }
104                    self.inner.try_flush_data().await?;
105                }
106                Message::Barrier(barrier) => {
107                    let post_commit = self.inner.flush_data(barrier.epoch).await?;
108                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id);
109                    yield Message::Barrier(barrier);
110
111                    // Update the vnode bitmap, only used by Group Top-N.
112                    if let Some((_, cache_may_stale)) =
113                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
114                    {
115                        // Update the vnode bitmap for the state table and manipulate the cache if necessary, only used
116                        // by Group Top-N since it's distributed.
117                        if cache_may_stale {
118                            self.inner.clear_cache();
119                        }
120                    }
121                }
122            };
123        }
124    }
125}
126
127/// For a given pk (Row), it can be split into `order_key` and `additional_pk` according to
128/// `order_by_len`, and the two split parts are serialized separately.
129pub fn serialize_pk_to_cache_key(pk: impl Row, cache_key_serde: &CacheKeySerde) -> CacheKey {
130    // TODO(row trait): may support splitting row
131    let pk = pk.into_owned_row().into_inner();
132    let (cache_key_first, cache_key_second) = pk.split_at(cache_key_serde.2);
133    (
134        cache_key_first.memcmp_serialize(&cache_key_serde.0),
135        cache_key_second.memcmp_serialize(&cache_key_serde.1),
136    )
137}
138
139/// See [`CacheKey`].
140///
141/// The last `usize` is the length of `order_by`, i.e., the first part of the key.
142pub type CacheKeySerde = (OrderedRowSerde, OrderedRowSerde, usize);
143
144pub fn create_cache_key_serde(
145    storage_key: &[ColumnOrder],
146    schema: &Schema,
147    order_by: &[ColumnOrder],
148    group_by: &[usize],
149) -> CacheKeySerde {
150    {
151        // validate storage_key = group_by + order_by + additional_pk
152        for i in 0..group_by.len() {
153            assert_eq!(storage_key[i].column_index, group_by[i]);
154        }
155        for i in group_by.len()..(group_by.len() + order_by.len()) {
156            assert_eq!(storage_key[i], order_by[i - group_by.len()]);
157        }
158    }
159
160    let (cache_key_data_types, cache_key_order_types): (Vec<_>, Vec<_>) = storage_key
161        [group_by.len()..]
162        .iter()
163        .map(|o| (schema[o.column_index].data_type(), o.order_type))
164        .unzip();
165
166    let order_by_len = order_by.len();
167    let (first_key_data_types, second_key_data_types) = cache_key_data_types.split_at(order_by_len);
168    let (first_key_order_types, second_key_order_types) =
169        cache_key_order_types.split_at(order_by_len);
170    let first_key_serde = OrderedRowSerde::new(
171        first_key_data_types.to_vec(),
172        first_key_order_types.to_vec(),
173    );
174    let second_key_serde = OrderedRowSerde::new(
175        second_key_data_types.to_vec(),
176        second_key_order_types.to_vec(),
177    );
178    (first_key_serde, second_key_serde, order_by_len)
179}
180
181use risingwave_common::row;
182
183use crate::common::table::state_table::StateTablePostCommit;
184
185pub trait GroupKey = row::Row + Send + Sync;
186pub const NO_GROUP_KEY: Option<row::Empty> = None;