risingwave_stream/executor/top_n/
group_top_n_appendonly.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_common::array::Op;
18use risingwave_common::hash::HashKey;
19use risingwave_common::row::{RowDeserializer, RowExt};
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_common::util::iter_util::ZipEqDebug;
22use risingwave_common::util::sort_util::ColumnOrder;
23
24use super::group_top_n::GroupTopNCache;
25use super::top_n_cache::AppendOnlyTopNCacheTrait;
26use super::utils::*;
27use super::{ManagedTopNState, TopNCache};
28use crate::common::metrics::MetricsInfo;
29use crate::common::table::state_table::StateTablePostCommit;
30use crate::executor::monitor::GroupTopNMetrics;
31use crate::executor::prelude::*;
32use crate::executor::top_n::top_n_cache::TopNStaging;
33
34/// If the input is append-only, `AppendOnlyGroupTopNExecutor` does not need
35/// to keep all the rows seen. As long as a record
36/// is no longer in the result set, it can be deleted.
37pub type AppendOnlyGroupTopNExecutor<K, S, const WITH_TIES: bool> =
38    TopNExecutorWrapper<InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>>;
39
40impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
41    AppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
42{
43    #[allow(clippy::too_many_arguments)]
44    pub fn new(
45        input: Executor,
46        ctx: ActorContextRef,
47        schema: Schema,
48        storage_key: Vec<ColumnOrder>,
49        offset_and_limit: (usize, usize),
50        order_by: Vec<ColumnOrder>,
51        group_by: Vec<usize>,
52        state_table: StateTable<S>,
53        watermark_epoch: AtomicU64Ref,
54    ) -> StreamResult<Self> {
55        let inner = InnerAppendOnlyGroupTopNExecutor::new(
56            schema,
57            storage_key,
58            offset_and_limit,
59            order_by,
60            group_by,
61            state_table,
62            watermark_epoch,
63            &ctx,
64        )?;
65        Ok(TopNExecutorWrapper { input, ctx, inner })
66    }
67}
68
69pub struct InnerAppendOnlyGroupTopNExecutor<K: HashKey, S: StateStore, const WITH_TIES: bool> {
70    schema: Schema,
71
72    /// `LIMIT XXX`. None means no limit.
73    limit: usize,
74
75    /// `OFFSET XXX`. `0` means no offset.
76    offset: usize,
77
78    /// The storage key indices of the `AppendOnlyGroupTopNExecutor`
79    storage_key_indices: PkIndices,
80
81    managed_state: ManagedTopNState<S>,
82
83    /// which column we used to group the data.
84    group_by: Vec<usize>,
85
86    /// group key -> cache for this group
87    caches: GroupTopNCache<K, WITH_TIES>,
88
89    /// Used for serializing pk into `CacheKey`.
90    cache_key_serde: CacheKeySerde,
91
92    /// Minimum cache capacity per group from config
93    topn_cache_min_capacity: usize,
94
95    metrics: GroupTopNMetrics,
96}
97
98impl<K: HashKey, S: StateStore, const WITH_TIES: bool>
99    InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
100{
101    #[allow(clippy::too_many_arguments)]
102    pub fn new(
103        schema: Schema,
104        storage_key: Vec<ColumnOrder>,
105        offset_and_limit: (usize, usize),
106        order_by: Vec<ColumnOrder>,
107        group_by: Vec<usize>,
108        state_table: StateTable<S>,
109        watermark_epoch: AtomicU64Ref,
110        ctx: &ActorContext,
111    ) -> StreamResult<Self> {
112        let metrics_info = MetricsInfo::new(
113            ctx.streaming_metrics.clone(),
114            state_table.table_id(),
115            ctx.id,
116            "AppendOnlyGroupTopN",
117        );
118        let metrics = ctx.streaming_metrics.new_append_only_group_top_n_metrics(
119            state_table.table_id(),
120            ctx.id,
121            ctx.fragment_id,
122        );
123
124        let cache_key_serde = create_cache_key_serde(&storage_key, &schema, &order_by, &group_by);
125        let managed_state = ManagedTopNState::<S>::new(state_table, cache_key_serde.clone());
126
127        Ok(Self {
128            schema,
129            offset: offset_and_limit.0,
130            limit: offset_and_limit.1,
131            managed_state,
132            storage_key_indices: storage_key.into_iter().map(|op| op.column_index).collect(),
133            group_by,
134            caches: GroupTopNCache::new(watermark_epoch, metrics_info),
135            cache_key_serde,
136            topn_cache_min_capacity: ctx.streaming_config.developer.topn_cache_min_capacity,
137            metrics,
138        })
139    }
140}
141
142impl<K: HashKey, S: StateStore, const WITH_TIES: bool> TopNExecutorBase
143    for InnerAppendOnlyGroupTopNExecutor<K, S, WITH_TIES>
144where
145    TopNCache<WITH_TIES>: AppendOnlyTopNCacheTrait,
146{
147    type State = S;
148
149    async fn apply_chunk(
150        &mut self,
151        chunk: StreamChunk,
152    ) -> StreamExecutorResult<Option<StreamChunk>> {
153        let keys = K::build_many(&self.group_by, chunk.data_chunk());
154        let mut stagings: HashMap<K, TopNStaging> = HashMap::new(); // K -> `TopNStaging`
155
156        let data_types = self.schema.data_types();
157        let deserializer = RowDeserializer::new(data_types.clone());
158        for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
159            let Some((op, row_ref)) = r else {
160                continue;
161            };
162
163            // The pk without group by
164            let pk_row = row_ref.project(&self.storage_key_indices[self.group_by.len()..]);
165            let cache_key = serialize_pk_to_cache_key(pk_row, &self.cache_key_serde);
166
167            let group_key = row_ref.project(&self.group_by);
168            self.metrics.group_top_n_total_query_cache_count.inc();
169            // If 'self.caches' does not already have a cache for the current group, create a new
170            // cache for it and insert it into `self.caches`
171            if !self.caches.contains(group_cache_key) {
172                self.metrics.group_top_n_cache_miss_count.inc();
173                let mut topn_cache = TopNCache::with_min_capacity(
174                    self.offset,
175                    self.limit,
176                    data_types.clone(),
177                    self.topn_cache_min_capacity,
178                );
179                self.managed_state
180                    .init_append_only_topn_cache(Some(group_key), &mut topn_cache)
181                    .await?;
182                self.caches.put(group_cache_key.clone(), topn_cache);
183            }
184
185            let mut cache = self.caches.get_mut(group_cache_key).unwrap();
186            let staging = stagings.entry(group_cache_key.clone()).or_default();
187
188            debug_assert_eq!(op, Op::Insert);
189            cache.insert(
190                cache_key,
191                row_ref,
192                staging,
193                &mut self.managed_state,
194                &deserializer,
195            )?;
196        }
197
198        self.metrics
199            .group_top_n_cached_entry_count
200            .set(self.caches.len() as i64);
201
202        let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(chunk.capacity()));
203        for staging in stagings.into_values() {
204            for res in staging.into_deserialized_changes(&deserializer) {
205                let record = res?;
206                let _none = chunk_builder.append_record(record);
207            }
208        }
209
210        Ok(chunk_builder.take())
211    }
212
213    async fn flush_data(
214        &mut self,
215        epoch: EpochPair,
216    ) -> StreamExecutorResult<StateTablePostCommit<'_, S>> {
217        self.managed_state.flush(epoch).await
218    }
219
220    async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
221        self.managed_state.try_flush().await
222    }
223
224    fn clear_cache(&mut self) {
225        self.caches.clear();
226    }
227
228    fn evict(&mut self) {
229        self.caches.evict()
230    }
231
232    async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
233        self.managed_state.init_epoch(epoch).await
234    }
235
236    async fn handle_watermark(&mut self, watermark: Watermark) -> Option<Watermark> {
237        if watermark.col_idx == self.group_by[0] {
238            self.managed_state.update_watermark(watermark.val.clone());
239            Some(watermark)
240        } else {
241            None
242        }
243    }
244}