risingwave_stream/executor/top_n/
top_n_cache.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::fmt::Debug;
17use std::future::Future;
18
19use itertools::Itertools;
20use risingwave_common::array::RowRef;
21use risingwave_common::array::stream_record::Record;
22use risingwave_common::row::{CompactedRow, OwnedRow, Row, RowDeserializer, RowExt};
23use risingwave_common::types::DataType;
24use risingwave_common_estimate_size::EstimateSize;
25use risingwave_common_estimate_size::collections::EstimatedBTreeMap;
26use risingwave_storage::StateStore;
27
28use super::{GroupKey, ManagedTopNState};
29use crate::common::change_buffer::ChangeBuffer;
30use crate::consistency::{consistency_error, enable_strict_consistency};
31use crate::executor::error::StreamExecutorResult;
32
33/// `CacheKey` is composed of `(order_by, remaining columns of pk)`.
34pub type CacheKey = (Vec<u8>, Vec<u8>);
35pub type Cache = EstimatedBTreeMap<CacheKey, CompactedRow>;
36
37const TOPN_CACHE_HIGH_CAPACITY_FACTOR: usize = 2;
38const TOPN_CACHE_MIN_CAPACITY: usize = 10;
39
40/// Cache for [`ManagedTopNState`].
41///
42/// The key in the maps [`CacheKey`] is `[ order_by + remaining columns of pk ]`. `group_key` is not
43/// included.
44///
45/// # `WITH_TIES`
46///
47/// `WITH_TIES` supports the semantic of `FETCH FIRST n ROWS WITH TIES` and `RANK() <= n`.
48///
49/// `OFFSET m FETCH FIRST n ROWS WITH TIES` and `m <= RANK() <= n` are not supported now,
50/// since they have different semantics.
51pub struct TopNCache<const WITH_TIES: bool> {
52    /// Rows in the range `[0, offset)`. Should always be synced with state table.
53    pub low: Option<Cache>,
54
55    /// Rows in the range `[offset, offset+limit)`. Should always be synced with state table.
56    ///
57    /// When `WITH_TIES` is true, it also stores ties for the last element,
58    /// and thus the size can be larger than `limit`.
59    pub middle: Cache,
60
61    /// Cache of the beginning rows in the range `[offset+limit, ...)`.
62    ///
63    /// This is very similar to [`TopNStateCache`], which only caches the top-N rows in the table
64    /// and only accepts new records that are less than the largest in the cache.
65    ///
66    /// When `WITH_TIES` is true, it guarantees that the ties of the last element are in the cache,
67    /// and thus the size can be larger than `rest_cache_capacity`.
68    ///
69    /// When the cache becomes empty, if the `table_row_count` is not matched, we need to view the cache
70    /// as unsynced and refill it from the state table.
71    ///
72    /// TODO(rc): later we should reuse [`TopNStateCache`] here.
73    ///
74    /// [`TopNStateCache`]: crate::common::state_cache::TopNStateCache
75    pub high: Cache,
76    pub high_cache_capacity: usize,
77
78    pub offset: usize,
79    /// Assumption: `limit != 0`
80    pub limit: usize,
81
82    /// Number of rows corresponding to the current group.
83    /// This is a nice-to-have information. `None` means we don't know the row count,
84    /// but it doesn't prevent us from working correctly.
85    table_row_count: Option<usize>,
86
87    /// Data types for the full row.
88    ///
89    /// For debug formatting only.
90    data_types: Vec<DataType>,
91}
92
93impl<const WITH_TIES: bool> EstimateSize for TopNCache<WITH_TIES> {
94    fn estimated_heap_size(&self) -> usize {
95        self.low.estimated_heap_size()
96            + self.middle.estimated_heap_size()
97            + self.high.estimated_heap_size()
98    }
99}
100
101impl<const WITH_TIES: bool> Debug for TopNCache<WITH_TIES> {
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        write!(
104            f,
105            "TopNCache {{\n  offset: {}, limit: {}, high_cache_capacity: {},\n",
106            self.offset, self.limit, self.high_cache_capacity
107        )?;
108
109        fn format_cache(
110            f: &mut std::fmt::Formatter<'_>,
111            cache: &Cache,
112            data_types: &[DataType],
113        ) -> std::fmt::Result {
114            if cache.is_empty() {
115                return write!(f, "    <empty>");
116            }
117            write!(
118                f,
119                "    {}",
120                cache
121                    .iter()
122                    .format_with("\n    ", |item, f| f(&format_args!(
123                        "{:?}, {}",
124                        item.0,
125                        item.1.deserialize(data_types).unwrap().display(),
126                    )))
127            )
128        }
129
130        writeln!(f, "  low:")?;
131        if let Some(low) = &self.low {
132            format_cache(f, low, &self.data_types)?;
133        } else {
134            writeln!(f, "    <none>")?;
135        }
136        writeln!(f, "\n  middle:")?;
137        format_cache(f, &self.middle, &self.data_types)?;
138        writeln!(f, "\n  high:")?;
139        format_cache(f, &self.high, &self.data_types)?;
140
141        write!(f, "\n}}")?;
142        Ok(())
143    }
144}
145
146/// This trait is used as a bound. It is needed since
147/// `TopNCache::<true>::f` and `TopNCache::<false>::f`
148/// don't imply `TopNCache::<WITH_TIES>::f`.
149pub trait TopNCacheTrait {
150    /// Insert input row to corresponding cache range according to its order key.
151    ///
152    /// Changes in `self.middle` is recorded to `res_ops` and `res_rows`, which will be
153    /// used to generate messages to be sent to downstream operators.
154    fn insert(&mut self, cache_key: CacheKey, row: impl Row, staging: &mut TopNStaging);
155
156    /// Delete input row from the cache.
157    ///
158    /// Changes in `self.middle` is recorded to `res_ops` and `res_rows`, which will be
159    /// used to generate messages to be sent to downstream operators.
160    ///
161    /// Because we may need to refill data from the state table to `self.high` during the delete
162    /// operation, we need to pass in `group_key`, `epoch` and `managed_state` to do a prefix
163    /// scan of the state table.
164    fn delete<S: StateStore>(
165        &mut self,
166        group_key: Option<impl GroupKey>,
167        managed_state: &mut ManagedTopNState<S>,
168        cache_key: CacheKey,
169        row: impl Row,
170        staging: &mut TopNStaging,
171    ) -> impl Future<Output = StreamExecutorResult<()>> + Send;
172}
173
174impl<const WITH_TIES: bool> TopNCache<WITH_TIES> {
175    /// `data_types` -- Data types for the full row.
176    /// `min_capacity` -- Minimum capacity for the high cache. When not provided, defaults to `TOPN_CACHE_MIN_CAPACITY`.
177    pub fn new(offset: usize, limit: usize, data_types: Vec<DataType>) -> Self {
178        Self::with_min_capacity(offset, limit, data_types, TOPN_CACHE_MIN_CAPACITY)
179    }
180
181    /// `data_types` -- Data types for the full row.
182    /// `min_capacity` -- Minimum capacity for the high cache.
183    pub fn with_min_capacity(
184        offset: usize,
185        limit: usize,
186        data_types: Vec<DataType>,
187        min_capacity: usize,
188    ) -> Self {
189        assert!(limit > 0);
190        if WITH_TIES {
191            // It's trickier to support.
192            // Also `OFFSET WITH TIES` has different semantic with `a < RANK() < b`
193            assert!(offset == 0, "OFFSET is not supported with WITH TIES");
194        }
195        let high_cache_capacity = offset
196            .checked_add(limit)
197            .and_then(|v| v.checked_mul(TOPN_CACHE_HIGH_CAPACITY_FACTOR))
198            .unwrap_or(usize::MAX)
199            .max(min_capacity);
200        Self {
201            low: if offset > 0 { Some(Cache::new()) } else { None },
202            middle: Cache::new(),
203            high: Cache::new(),
204            high_cache_capacity,
205            offset,
206            limit,
207            table_row_count: None,
208            data_types,
209        }
210    }
211
212    /// Clear the cache. After this, the cache must be `init` again before use.
213    #[allow(dead_code)]
214    pub fn clear(&mut self) {
215        self.low.as_mut().map(Cache::clear);
216        self.middle.clear();
217        self.high.clear();
218    }
219
220    /// Get total count of entries in the cache.
221    pub fn len(&self) -> usize {
222        self.low.as_ref().map(Cache::len).unwrap_or(0) + self.middle.len() + self.high.len()
223    }
224
225    pub(super) fn update_table_row_count(&mut self, table_row_count: usize) {
226        self.table_row_count = Some(table_row_count)
227    }
228
229    pub fn low_is_full(&self) -> bool {
230        if let Some(low) = &self.low {
231            assert!(low.len() <= self.offset);
232            let full = low.len() == self.offset;
233            if !full {
234                assert!(self.middle.is_empty());
235                assert!(self.high.is_empty());
236            }
237            full
238        } else {
239            true
240        }
241    }
242
243    pub fn middle_is_full(&self) -> bool {
244        // For WITH_TIES, the middle cache can exceed the capacity.
245        if !WITH_TIES {
246            assert!(
247                self.middle.len() <= self.limit,
248                "the middle cache exceeds the capacity\n{self:?}"
249            );
250        }
251        let full = self.middle.len() >= self.limit;
252        if full {
253            assert!(self.low_is_full());
254        } else {
255            assert!(
256                self.high.is_empty(),
257                "the high cache is not empty when middle cache is not full:\n{self:?}"
258            );
259        }
260        full
261    }
262
263    pub fn high_is_full(&self) -> bool {
264        // For WITH_TIES, the high cache can exceed the capacity.
265        if !WITH_TIES {
266            assert!(self.high.len() <= self.high_cache_capacity);
267        }
268        self.high.len() >= self.high_cache_capacity
269    }
270
271    fn high_is_synced(&self) -> bool {
272        if !self.high.is_empty() {
273            true
274        } else {
275            // check if table row count matches
276            self.table_row_count
277                .map(|n| n == self.len())
278                .unwrap_or(false)
279        }
280    }
281
282    fn last_cache_key_before_high(&self) -> Option<&CacheKey> {
283        let middle_last_key = self.middle.last_key_value().map(|(k, _)| k);
284        middle_last_key.or_else(|| {
285            self.low
286                .as_ref()
287                .and_then(Cache::last_key_value)
288                .map(|(k, _)| k)
289        })
290    }
291}
292
293impl TopNCacheTrait for TopNCache<false> {
294    fn insert(&mut self, cache_key: CacheKey, row: impl Row, staging: &mut TopNStaging) {
295        if let Some(row_count) = self.table_row_count.as_mut() {
296            *row_count += 1;
297        }
298
299        let mut to_insert = (cache_key, (&row).into());
300        let mut is_last_of_lower_cache = false; // for saving one key comparison
301
302        let low_is_full = self.low_is_full();
303        if let Some(low) = &mut self.low {
304            // try insert into low cache
305
306            if !low_is_full {
307                low.insert(to_insert.0, to_insert.1);
308                return;
309            }
310
311            // low cache is full
312            let low_last = low.last_entry().unwrap();
313            if &to_insert.0 < low_last.key() {
314                // make space for the new entry
315                let low_last = low_last.remove_entry();
316                low.insert(to_insert.0, to_insert.1);
317                to_insert = low_last; // move the last entry to the middle cache
318                is_last_of_lower_cache = true;
319            }
320        }
321
322        // try insert into middle cache
323
324        if !self.middle_is_full() {
325            self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
326            staging.insert(to_insert.0, to_insert.1);
327            return;
328        }
329
330        // middle cache is full
331        let middle_last = self.middle.last_entry().unwrap();
332        if is_last_of_lower_cache || &to_insert.0 < middle_last.key() {
333            // make space for the new entry
334            let middle_last = middle_last.remove_entry();
335            self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
336
337            staging.delete(middle_last.0.clone(), middle_last.1.clone());
338            staging.insert(to_insert.0, to_insert.1);
339
340            to_insert = middle_last; // move the last entry to the high cache
341            is_last_of_lower_cache = true;
342        }
343
344        // try insert into high cache
345
346        // The logic is a bit different from the other two caches, because high cache is not
347        // guaranteed to be fully synced with the "high part" of the table.
348
349        if is_last_of_lower_cache || self.high_is_synced() {
350            // For `is_last_of_lower_cache`, an obvious observation is that the key to insert is
351            // always smaller than any key in the high part of the table.
352
353            if self.high.is_empty() {
354                // if high cache is empty, we can insert directly
355                self.high.insert(to_insert.0, to_insert.1);
356                return;
357            }
358
359            let high_is_full = self.high_is_full();
360            let high_last = self.high.last_entry().unwrap();
361
362            if is_last_of_lower_cache || &to_insert.0 < high_last.key() {
363                // we can only insert if the key is smaller than the largest key in the high cache
364                if high_is_full {
365                    // make space for the new entry
366                    high_last.remove_entry();
367                }
368                self.high.insert(to_insert.0, to_insert.1);
369            }
370        }
371    }
372
373    async fn delete<S: StateStore>(
374        &mut self,
375        group_key: Option<impl GroupKey>,
376        managed_state: &mut ManagedTopNState<S>,
377        cache_key: CacheKey,
378        row: impl Row,
379        staging: &mut TopNStaging,
380    ) -> StreamExecutorResult<()> {
381        if !enable_strict_consistency() && self.table_row_count == Some(0) {
382            // If strict consistency is disabled, and we receive a `DELETE` but the row count is 0, we
383            // should not panic. Instead, we pretend that we don't know about the actually row count.
384            consistency_error!("table row count is 0, but we receive a DELETE operation");
385            self.table_row_count = None;
386        }
387        if let Some(row_count) = self.table_row_count.as_mut() {
388            *row_count -= 1;
389        }
390
391        if self.middle_is_full() && &cache_key > self.middle.last_key_value().unwrap().0 {
392            // the row is in high
393            self.high.remove(&cache_key);
394        } else if self.low_is_full()
395            && self
396                .low
397                .as_ref()
398                .map(|low| &cache_key > low.last_key_value().unwrap().0)
399                .unwrap_or(
400                    true, // if low is None, `cache_key` should be in middle
401                )
402        {
403            // the row is in middle
404            let removed = self.middle.remove(&cache_key);
405            staging.delete(cache_key.clone(), (&row).into());
406
407            if removed.is_none() {
408                // the middle cache should always be synced, if the key is not found, then it also doesn't
409                // exist in the state table
410                consistency_error!(
411                    ?group_key,
412                    ?cache_key,
413                    "cache key not found in middle cache"
414                );
415                return Ok(());
416            }
417
418            // refill the high cache if it's not synced
419            if !self.high_is_synced() {
420                self.high.clear();
421                managed_state
422                    .fill_high_cache(
423                        group_key,
424                        self,
425                        self.last_cache_key_before_high().cloned(),
426                        self.high_cache_capacity,
427                    )
428                    .await?;
429            }
430
431            // bring one element, if any, from high cache to middle cache
432            if !self.high.is_empty() {
433                let high_first = self.high.pop_first().unwrap();
434                self.middle
435                    .insert(high_first.0.clone(), high_first.1.clone());
436                staging.insert(high_first.0, high_first.1);
437            }
438
439            assert!(self.high.is_empty() || self.middle.len() == self.limit);
440        } else {
441            // the row is in low
442            let low = self.low.as_mut().unwrap();
443            let removed = low.remove(&cache_key);
444
445            if removed.is_none() {
446                // the low cache should always be synced, if the key is not found, then it also doesn't
447                // exist in the state table
448                consistency_error!(?group_key, ?cache_key, "cache key not found in low cache");
449                return Ok(());
450            }
451
452            // bring one element, if any, from middle cache to low cache
453            if !self.middle.is_empty() {
454                let middle_first = self.middle.pop_first().unwrap();
455                staging.delete(middle_first.0.clone(), middle_first.1.clone());
456                low.insert(middle_first.0, middle_first.1);
457
458                // fill the high cache if it's not synced
459                if !self.high_is_synced() {
460                    self.high.clear();
461                    managed_state
462                        .fill_high_cache(
463                            group_key,
464                            self,
465                            self.last_cache_key_before_high().cloned(),
466                            self.high_cache_capacity,
467                        )
468                        .await?;
469                }
470
471                // bring one element, if any, from high cache to middle cache
472                if !self.high.is_empty() {
473                    let high_first = self.high.pop_first().unwrap();
474                    self.middle
475                        .insert(high_first.0.clone(), high_first.1.clone());
476                    staging.insert(high_first.0, high_first.1);
477                }
478            }
479        }
480
481        Ok(())
482    }
483}
484
485impl TopNCacheTrait for TopNCache<true> {
486    fn insert(&mut self, cache_key: CacheKey, row: impl Row, staging: &mut TopNStaging) {
487        if let Some(row_count) = self.table_row_count.as_mut() {
488            *row_count += 1;
489        }
490
491        assert!(
492            self.low.is_none(),
493            "Offset is not supported yet for WITH TIES, so low cache should be None"
494        );
495
496        let to_insert: (CacheKey, CompactedRow) = (cache_key, (&row).into());
497
498        // try insert into middle cache
499
500        if !self.middle_is_full() {
501            self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
502            staging.insert(to_insert.0.clone(), to_insert.1);
503            return;
504        }
505
506        // middle cache is full
507
508        let to_insert_sort_key = &(to_insert.0).0;
509        let middle_last_sort_key = self.middle.last_key().unwrap().0.clone();
510
511        match to_insert_sort_key.cmp(&middle_last_sort_key) {
512            Ordering::Less => {
513                // the row is in middle
514                let n_ties_of_last = self
515                    .middle
516                    .range((middle_last_sort_key.clone(), vec![])..)
517                    .count();
518                // We evict the last row and its ties only if the number of remaining rows still is
519                // still larger than limit, i.e., there are limit-1 other rows.
520                //
521                // e.g., limit = 3, [1,1,1,1]
522                // insert 0 -> [0,1,1,1,1]
523                // insert 0 -> [0,0,1,1,1,1]
524                // insert 0 -> [0,0,0]
525                if self.middle.len() + 1 - n_ties_of_last >= self.limit {
526                    // Middle will be full without the last element and its ties after insertion.
527                    // Let's move the last element and its ties to high cache first.
528                    while let Some(middle_last) = self.middle.last_entry()
529                        && middle_last.key().0 == middle_last_sort_key
530                    {
531                        let middle_last = middle_last.remove_entry();
532                        staging.delete(middle_last.0.clone(), middle_last.1.clone());
533                        // we can blindly move entries from middle cache to high cache no matter high cache is synced or not
534                        self.high.insert(middle_last.0, middle_last.1);
535                    }
536                }
537                if self.high.len() > self.high_cache_capacity {
538                    // evict some entries from high cache if it exceeds the capacity
539                    let high_last = self.high.pop_last().unwrap();
540                    let high_last_sort_key = (high_last.0).0;
541                    // Remove all ties of the last element in high cache, for the sake of simplicity.
542                    // This may cause repeatedly refill the high cache if number of ties is large.
543                    self.high.retain(|k, _| k.0 != high_last_sort_key);
544                }
545
546                self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
547                staging.insert(to_insert.0, to_insert.1);
548            }
549            Ordering::Equal => {
550                // the row is in middle and is a tie of the last row
551                self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
552                staging.insert(to_insert.0, to_insert.1);
553            }
554            Ordering::Greater => {
555                // the row is in high
556
557                if self.high_is_synced() {
558                    // only insert into high cache if it is synced
559
560                    if self.high.is_empty() {
561                        // if high cache is empty, we can insert directly
562                        self.high.insert(to_insert.0, to_insert.1);
563                        return;
564                    }
565
566                    if to_insert_sort_key <= &self.high.last_key().unwrap().0 {
567                        // We can only insert if the key is <= the largest key in the high cache.
568                        // Note that we have all ties of the last element in the high cache, so we can
569                        // safely compare only the sort key.
570                        self.high.insert(to_insert.0, to_insert.1);
571                    }
572
573                    if self.high.len() > self.high_cache_capacity {
574                        // evict some entries from high cache if it exceeds the capacity
575                        let high_last = self.high.pop_last().unwrap();
576                        let high_last_sort_key = (high_last.0).0;
577                        // Remove all ties of the last element in high cache, for the sake of simplicity.
578                        // This may cause repeatedly refill the high cache if number of ties is large.
579                        self.high.retain(|k, _| k.0 != high_last_sort_key);
580                    }
581                }
582            }
583        }
584    }
585
586    async fn delete<S: StateStore>(
587        &mut self,
588        group_key: Option<impl GroupKey>,
589        managed_state: &mut ManagedTopNState<S>,
590        cache_key: CacheKey,
591        row: impl Row,
592        staging: &mut TopNStaging,
593    ) -> StreamExecutorResult<()> {
594        if !enable_strict_consistency() && self.table_row_count == Some(0) {
595            // If strict consistency is disabled, and we receive a `DELETE` but the row count is 0, we
596            // should not panic. Instead, we pretend that we don't know about the actually row count.
597            self.table_row_count = None;
598        }
599        if let Some(row_count) = self.table_row_count.as_mut() {
600            *row_count -= 1;
601        }
602
603        assert!(
604            self.low.is_none(),
605            "Offset is not supported yet for WITH TIES, so low cache should be None"
606        );
607
608        if self.middle.is_empty() {
609            consistency_error!(
610                ?group_key,
611                ?cache_key,
612                "middle cache is empty, but we receive a DELETE operation"
613            );
614            staging.delete(cache_key, (&row).into());
615            return Ok(());
616        }
617
618        let middle_last_sort_key = self.middle.last_key().unwrap().0.clone();
619
620        let to_delete_sort_key = cache_key.0.clone();
621        if to_delete_sort_key > middle_last_sort_key {
622            // the row is in high
623            self.high.remove(&cache_key);
624        } else {
625            // the row is in middle
626            self.middle.remove(&cache_key);
627            staging.delete(cache_key.clone(), (&row).into());
628            if self.middle.len() >= self.limit {
629                // this can happen when there are ties
630                return Ok(());
631            }
632
633            // refill the high cache if it's not synced
634            if !self.high_is_synced() {
635                managed_state
636                    .fill_high_cache(
637                        group_key,
638                        self,
639                        self.last_cache_key_before_high().cloned(),
640                        self.high_cache_capacity,
641                    )
642                    .await?;
643            }
644
645            // bring the first element and its ties, if any, from high cache to middle cache
646            if !self.high.is_empty() {
647                let high_first = self.high.pop_first().unwrap();
648                let high_first_sort_key = (high_first.0).0.clone();
649                assert!(high_first_sort_key > middle_last_sort_key);
650
651                self.middle
652                    .insert(high_first.0.clone(), high_first.1.clone());
653                staging.insert(high_first.0, high_first.1);
654
655                for (cache_key, row) in self.high.extract_if(|k, _| k.0 == high_first_sort_key) {
656                    self.middle.insert(cache_key.clone(), row.clone());
657                    staging.insert(cache_key, row);
658                }
659            }
660        }
661
662        Ok(())
663    }
664}
665
666/// Similar to [`TopNCacheTrait`], but for append-only TopN.
667pub trait AppendOnlyTopNCacheTrait {
668    /// Insert input row to corresponding cache range according to its order key.
669    ///
670    /// Changes in `self.middle` is recorded to `res_ops` and `res_rows`, which will be
671    /// used to generate messages to be sent to downstream operators.
672    ///
673    /// `managed_state` is required because different from normal TopN, append-only TopN
674    /// doesn't insert all rows into the state table.
675    fn insert<S: StateStore>(
676        &mut self,
677        cache_key: CacheKey,
678        row_ref: RowRef<'_>,
679        staging: &mut TopNStaging,
680        managed_state: &mut ManagedTopNState<S>,
681        row_deserializer: &RowDeserializer,
682    ) -> StreamExecutorResult<()>;
683}
684
685impl AppendOnlyTopNCacheTrait for TopNCache<false> {
686    fn insert<S: StateStore>(
687        &mut self,
688        cache_key: CacheKey,
689        row_ref: RowRef<'_>,
690        staging: &mut TopNStaging,
691        managed_state: &mut ManagedTopNState<S>,
692        row_deserializer: &RowDeserializer,
693    ) -> StreamExecutorResult<()> {
694        if self.middle_is_full() && &cache_key >= self.middle.last_key().unwrap() {
695            return Ok(());
696        }
697        managed_state.insert(row_ref);
698
699        // insert input row into corresponding cache according to its sort key
700        let mut to_insert = (cache_key, row_ref.into());
701
702        let low_is_full = self.low_is_full();
703        if let Some(low) = &mut self.low {
704            // try insert into low cache
705
706            if !low_is_full {
707                low.insert(to_insert.0, to_insert.1);
708                return Ok(());
709            }
710
711            // low cache is full
712            let low_last = low.last_entry().unwrap();
713            if &to_insert.0 < low_last.key() {
714                // make space for the new entry
715                let low_last = low_last.remove_entry();
716                low.insert(to_insert.0, to_insert.1);
717                to_insert = low_last; // move the last entry to the middle cache
718            }
719        }
720
721        // try insert into middle cache
722
723        if !self.middle_is_full() {
724            self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
725            staging.insert(to_insert.0, to_insert.1);
726            return Ok(());
727        }
728
729        // The row must be in the range of [offset, offset+limit).
730        // the largest row in `cache.middle` needs to be removed.
731        let middle_last = self.middle.pop_last().unwrap();
732        debug_assert!(to_insert.0 < middle_last.0);
733        managed_state.delete(row_deserializer.deserialize(middle_last.1.row.as_ref())?);
734        staging.delete(middle_last.0, middle_last.1);
735
736        self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
737        staging.insert(to_insert.0, to_insert.1);
738
739        // Unlike normal topN, append only topN does not use the high part of the cache.
740
741        Ok(())
742    }
743}
744
745impl AppendOnlyTopNCacheTrait for TopNCache<true> {
746    fn insert<S: StateStore>(
747        &mut self,
748        cache_key: CacheKey,
749        row_ref: RowRef<'_>,
750        staging: &mut TopNStaging,
751        managed_state: &mut ManagedTopNState<S>,
752        row_deserializer: &RowDeserializer,
753    ) -> StreamExecutorResult<()> {
754        assert!(
755            self.low.is_none(),
756            "Offset is not supported yet for WITH TIES, so low cache should be empty"
757        );
758
759        let to_insert = (cache_key, row_ref);
760
761        // try insert into middle cache
762
763        if !self.middle_is_full() {
764            managed_state.insert(to_insert.1);
765            let row: CompactedRow = to_insert.1.into();
766            self.middle.insert(to_insert.0.clone(), row.clone());
767            staging.insert(to_insert.0, row);
768            return Ok(());
769        }
770
771        // middle cache is full
772
773        let to_insert_sort_key = &(to_insert.0).0;
774        let middle_last_sort_key = self.middle.last_key().unwrap().0.clone();
775
776        match to_insert_sort_key.cmp(&middle_last_sort_key) {
777            Ordering::Less => {
778                // the row is in middle
779                let n_ties_of_last = self
780                    .middle
781                    .range((middle_last_sort_key.clone(), vec![])..)
782                    .count();
783                // We evict the last row and its ties only if the number of remaining rows is
784                // still larger than limit, i.e., there are limit-1 other rows.
785                //
786                // e.g., limit = 3, [1,1,1,1]
787                // insert 0 -> [0,1,1,1,1]
788                // insert 0 -> [0,0,1,1,1,1]
789                // insert 0 -> [0,0,0]
790                if self.middle.len() + 1 - n_ties_of_last >= self.limit {
791                    // middle will be full without the last element and its ties after insertion
792                    while let Some(middle_last) = self.middle.last_entry()
793                        && middle_last.key().0 == middle_last_sort_key
794                    {
795                        let middle_last = middle_last.remove_entry();
796                        // we don't need to maintain the high part so just delete it from state table
797                        managed_state
798                            .delete(row_deserializer.deserialize(middle_last.1.row.as_ref())?);
799                        staging.delete(middle_last.0, middle_last.1);
800                    }
801                }
802
803                managed_state.insert(to_insert.1);
804                let row: CompactedRow = to_insert.1.into();
805                self.middle.insert(to_insert.0.clone(), row.clone());
806                staging.insert(to_insert.0, row);
807            }
808            Ordering::Equal => {
809                // the row is in middle and is a tie of the last row
810                managed_state.insert(to_insert.1);
811                let row: CompactedRow = to_insert.1.into();
812                self.middle.insert(to_insert.0.clone(), row.clone());
813                staging.insert(to_insert.0, row);
814            }
815            Ordering::Greater => {
816                // the row is in high, do nothing
817            }
818        }
819
820        Ok(())
821    }
822}
823
824/// Used to build diff between before and after applying an input chunk, for `TopNCache` (of one group).
825/// It should be maintained when an entry is inserted or deleted from the `middle` cache.
826#[derive(Debug, Default)]
827pub struct TopNStaging {
828    inner: ChangeBuffer<CacheKey, CompactedRow>,
829}
830
831impl TopNStaging {
832    pub fn new() -> Self {
833        Self::default()
834    }
835
836    /// Insert a row into the staging changes. This method must be called when a row is
837    /// added to the `middle` cache.
838    fn insert(&mut self, cache_key: CacheKey, row: CompactedRow) {
839        self.inner.insert(cache_key, row);
840    }
841
842    /// Delete a row from the staging changes. This method must be called when a row is
843    /// removed from the `middle` cache.
844    fn delete(&mut self, cache_key: CacheKey, row: CompactedRow) {
845        self.inner.delete(cache_key, row);
846    }
847
848    /// Get the count of effective changes in the staging.
849    pub fn len(&self) -> usize {
850        self.inner.len()
851    }
852
853    /// Check if the staging is empty.
854    pub fn is_empty(&self) -> bool {
855        self.inner.is_empty()
856    }
857
858    /// Iterate over the changes in the staging, and deserialize the rows.
859    pub fn into_deserialized_changes(
860        self,
861        deserializer: &RowDeserializer,
862    ) -> impl Iterator<Item = StreamExecutorResult<Record<OwnedRow>>> + '_ {
863        self.inner.into_records().map(|record| {
864            record.try_map(|row| {
865                deserializer
866                    .deserialize(row.row.as_ref())
867                    .map_err(Into::into)
868            })
869        })
870    }
871}
872
873#[cfg(test)]
874mod tests {
875    use risingwave_common::types::DataType;
876
877    use super::*;
878
879    #[test]
880    fn test_topn_cache_new_uses_default_min_capacity() {
881        let cache = TopNCache::<false>::new(0, 5, vec![DataType::Int32]);
882        assert_eq!(cache.high_cache_capacity, TOPN_CACHE_MIN_CAPACITY);
883    }
884
885    #[test]
886    fn test_topn_cache_with_custom_min_capacity() {
887        let custom_min_capacity = 25;
888        let cache =
889            TopNCache::<false>::with_min_capacity(0, 5, vec![DataType::Int32], custom_min_capacity);
890        assert_eq!(cache.high_cache_capacity, custom_min_capacity);
891    }
892
893    #[test]
894    fn test_topn_cache_high_capacity_factor_respected() {
895        let custom_min_capacity = 1;
896        let offset = 2;
897        let limit = 5;
898        let expected_capacity = (offset + limit) * TOPN_CACHE_HIGH_CAPACITY_FACTOR;
899
900        let cache = TopNCache::<false>::with_min_capacity(
901            offset,
902            limit,
903            vec![DataType::Int32],
904            custom_min_capacity,
905        );
906        assert_eq!(cache.high_cache_capacity, expected_capacity);
907    }
908
909    #[test]
910    fn test_topn_cache_min_capacity_takes_precedence_when_larger() {
911        let large_min_capacity = 100;
912        let offset = 0;
913        let limit = 5;
914        let expected_from_formula = (offset + limit) * TOPN_CACHE_HIGH_CAPACITY_FACTOR; // Should be 10
915
916        let cache = TopNCache::<false>::with_min_capacity(
917            offset,
918            limit,
919            vec![DataType::Int32],
920            large_min_capacity,
921        );
922        assert_eq!(cache.high_cache_capacity, large_min_capacity);
923        assert!(cache.high_cache_capacity > expected_from_formula);
924    }
925}