risingwave_stream/executor/top_n/
group_top_n_appendonly.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_common::array::Op;
18use risingwave_common::hash::HashKey;
19use risingwave_common::row::{RowDeserializer, RowExt};
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_common::util::iter_util::ZipEqDebug;
22use risingwave_common::util::sort_util::ColumnOrder;
23
24use super::group_top_n::GroupTopNCache;
25use super::top_n_cache::AppendOnlyTopNCacheTrait;
26use super::utils::*;
27use super::{ManagedTopNState, TopNCache};
28use crate::common::metrics::MetricsInfo;
29use crate::common::table::state_table::StateTablePostCommit;
30use crate::executor::monitor::GroupTopNMetrics;
31use crate::executor::prelude::*;
32
33/// If the input is append-only, `AppendOnlyGroupTopNExecutor` does not need
34/// to keep all the rows seen. As long as a record
35/// is no longer in the result set, it can be deleted.
36pub type AppendOnlyGroupTopNExecutor<K, S, const WITH_TIES: bool> =
37    TopNExecutorWrapper<InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>>;
38
39impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
40    AppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
41{
42    #[allow(clippy::too_many_arguments)]
43    pub fn new(
44        input: Executor,
45        ctx: ActorContextRef,
46        schema: Schema,
47        storage_key: Vec<ColumnOrder>,
48        offset_and_limit: (usize, usize),
49        order_by: Vec<ColumnOrder>,
50        group_by: Vec<usize>,
51        state_table: StateTable<S>,
52        watermark_epoch: AtomicU64Ref,
53    ) -> StreamResult<Self> {
54        let inner = InnerAppendOnlyGroupTopNExecutor::new(
55            schema,
56            storage_key,
57            offset_and_limit,
58            order_by,
59            group_by,
60            state_table,
61            watermark_epoch,
62            &ctx,
63        )?;
64        Ok(TopNExecutorWrapper { input, ctx, inner })
65    }
66}
67
68pub struct InnerAppendOnlyGroupTopNExecutor<K: HashKey, S: StateStore, const WITH_TIES: bool> {
69    schema: Schema,
70
71    /// `LIMIT XXX`. None means no limit.
72    limit: usize,
73
74    /// `OFFSET XXX`. `0` means no offset.
75    offset: usize,
76
77    /// The storage key indices of the `AppendOnlyGroupTopNExecutor`
78    storage_key_indices: PkIndices,
79
80    managed_state: ManagedTopNState<S>,
81
82    /// which column we used to group the data.
83    group_by: Vec<usize>,
84
85    /// group key -> cache for this group
86    caches: GroupTopNCache<K, WITH_TIES>,
87
88    /// Used for serializing pk into `CacheKey`.
89    cache_key_serde: CacheKeySerde,
90
91    /// Minimum cache capacity per group from config
92    topn_cache_min_capacity: usize,
93
94    metrics: GroupTopNMetrics,
95}
96
97impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
98    InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
99{
100    #[allow(clippy::too_many_arguments)]
101    pub fn new(
102        schema: Schema,
103        storage_key: Vec<ColumnOrder>,
104        offset_and_limit: (usize, usize),
105        order_by: Vec<ColumnOrder>,
106        group_by: Vec<usize>,
107        state_table: StateTable<S>,
108        watermark_epoch: AtomicU64Ref,
109        ctx: &ActorContext,
110    ) -> StreamResult<Self> {
111        let metrics_info = MetricsInfo::new(
112            ctx.streaming_metrics.clone(),
113            state_table.table_id(),
114            ctx.id,
115            "AppendOnlyGroupTopN",
116        );
117        let metrics = ctx.streaming_metrics.new_append_only_group_top_n_metrics(
118            state_table.table_id(),
119            ctx.id,
120            ctx.fragment_id,
121        );
122
123        let cache_key_serde = create_cache_key_serde(&storage_key, &schema, &order_by, &group_by);
124        let managed_state = ManagedTopNState::<S>::new(state_table, cache_key_serde.clone());
125
126        Ok(Self {
127            schema,
128            offset: offset_and_limit.0,
129            limit: offset_and_limit.1,
130            managed_state,
131            storage_key_indices: storage_key.into_iter().map(|op| op.column_index).collect(),
132            group_by,
133            caches: GroupTopNCache::new(watermark_epoch, metrics_info),
134            cache_key_serde,
135            topn_cache_min_capacity: ctx.streaming_config.developer.topn_cache_min_capacity,
136            metrics,
137        })
138    }
139}
140
141impl<K: HashKey, S: StateStore, const WITH_TIES: bool> TopNExecutorBase
142    for InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
143where
144    TopNCache<WITH_TIES>: AppendOnlyTopNCacheTrait,
145{
146    type State = S;
147
148    async fn apply_chunk(
149        &mut self,
150        chunk: StreamChunk,
151    ) -> StreamExecutorResult<Option<StreamChunk>> {
152        let keys = K::build_many(&self.group_by, chunk.data_chunk());
153        let mut stagings = HashMap::new(); // K -> `TopNStaging`
154
155        let data_types = self.schema.data_types();
156        let deserializer = RowDeserializer::new(data_types.clone());
157        for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
158            let Some((op, row_ref)) = r else {
159                continue;
160            };
161
162            // The pk without group by
163            let pk_row = row_ref.project(&self.storage_key_indices[self.group_by.len()..]);
164            let cache_key = serialize_pk_to_cache_key(pk_row, &self.cache_key_serde);
165
166            let group_key = row_ref.project(&self.group_by);
167            self.metrics.group_top_n_total_query_cache_count.inc();
168            // If 'self.caches' does not already have a cache for the current group, create a new
169            // cache for it and insert it into `self.caches`
170            if !self.caches.contains(group_cache_key) {
171                self.metrics.group_top_n_cache_miss_count.inc();
172                let mut topn_cache = TopNCache::with_min_capacity(
173                    self.offset,
174                    self.limit,
175                    data_types.clone(),
176                    self.topn_cache_min_capacity,
177                );
178                self.managed_state
179                    .init_append_only_topn_cache(Some(group_key), &mut topn_cache)
180                    .await?;
181                self.caches.put(group_cache_key.clone(), topn_cache);
182            }
183
184            let mut cache = self.caches.get_mut(group_cache_key).unwrap();
185            let staging = stagings.entry(group_cache_key.clone()).or_default();
186
187            debug_assert_eq!(op, Op::Insert);
188            cache.insert(
189                cache_key,
190                row_ref,
191                staging,
192                &mut self.managed_state,
193                &deserializer,
194            )?;
195        }
196
197        self.metrics
198            .group_top_n_cached_entry_count
199            .set(self.caches.len() as i64);
200
201        let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(chunk.capacity()));
202        for staging in stagings.into_values() {
203            for res in staging.into_deserialized_changes(&deserializer) {
204                let (op, row) = res?;
205                let _none = chunk_builder.append_row(op, row);
206            }
207        }
208
209        Ok(chunk_builder.take())
210    }
211
212    async fn flush_data(
213        &mut self,
214        epoch: EpochPair,
215    ) -> StreamExecutorResult<StateTablePostCommit<'_, S>> {
216        self.managed_state.flush(epoch).await
217    }
218
219    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
220        self.managed_state.try_flush().await
221    }
222
223    fn clear_cache(&mut self) {
224        self.caches.clear();
225    }
226
227    fn evict(&mut self) {
228        self.caches.evict()
229    }
230
231    async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
232        self.managed_state.init_epoch(epoch).await
233    }
234
235    async fn handle_watermark(&mut self, watermark: Watermark) -> Option<Watermark> {
236        if watermark.col_idx == self.group_by[0] {
237            self.managed_state.update_watermark(watermark.val.clone());
238            Some(watermark)
239        } else {
240            None
241        }
242    }
243}