risingwave_stream/executor/top_n/
top_n_cache.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::BTreeMap;
17use std::fmt::Debug;
18use std::future::Future;
19
20use itertools::Itertools;
21use risingwave_common::array::{Op, RowRef};
22use risingwave_common::row::{CompactedRow, OwnedRow, Row, RowDeserializer, RowExt};
23use risingwave_common::types::DataType;
24use risingwave_common_estimate_size::EstimateSize;
25use risingwave_common_estimate_size::collections::EstimatedBTreeMap;
26use risingwave_storage::StateStore;
27
28use super::{GroupKey, ManagedTopNState};
29use crate::consistency::{consistency_error, enable_strict_consistency};
30use crate::executor::error::StreamExecutorResult;
31
32/// `CacheKey` is composed of `(order_by, remaining columns of pk)`.
33pub type CacheKey = (Vec<u8>, Vec<u8>);
34pub type Cache = EstimatedBTreeMap<CacheKey, CompactedRow>;
35
36const TOPN_CACHE_HIGH_CAPACITY_FACTOR: usize = 2;
37const TOPN_CACHE_MIN_CAPACITY: usize = 10;
38
39/// Cache for [`ManagedTopNState`].
40///
41/// The key in the maps [`CacheKey`] is `[ order_by + remaining columns of pk ]`. `group_key` is not
42/// included.
43///
44/// # `WITH_TIES`
45///
46/// `WITH_TIES` supports the semantic of `FETCH FIRST n ROWS WITH TIES` and `RANK() <= n`.
47///
48/// `OFFSET m FETCH FIRST n ROWS WITH TIES` and `m <= RANK() <= n` are not supported now,
49/// since they have different semantics.
50pub struct TopNCache<const WITH_TIES: bool> {
51    /// Rows in the range `[0, offset)`. Should always be synced with state table.
52    pub low: Option<Cache>,
53
54    /// Rows in the range `[offset, offset+limit)`. Should always be synced with state table.
55    ///
56    /// When `WITH_TIES` is true, it also stores ties for the last element,
57    /// and thus the size can be larger than `limit`.
58    pub middle: Cache,
59
60    /// Cache of the beginning rows in the range `[offset+limit, ...)`.
61    ///
62    /// This is very similar to [`TopNStateCache`], which only caches the top-N rows in the table
63    /// and only accepts new records that are less than the largest in the cache.
64    ///
65    /// When `WITH_TIES` is true, it guarantees that the ties of the last element are in the cache,
66    /// and thus the size can be larger than `rest_cache_capacity`.
67    ///
68    /// When the cache becomes empty, if the `table_row_count` is not matched, we need to view the cache
69    /// as unsynced and refill it from the state table.
70    ///
71    /// TODO(rc): later we should reuse [`TopNStateCache`] here.
72    ///
73    /// [`TopNStateCache`]: crate::common::state_cache::TopNStateCache
74    pub high: Cache,
75    pub high_cache_capacity: usize,
76
77    pub offset: usize,
78    /// Assumption: `limit != 0`
79    pub limit: usize,
80
81    /// Number of rows corresponding to the current group.
82    /// This is a nice-to-have information. `None` means we don't know the row count,
83    /// but it doesn't prevent us from working correctly.
84    table_row_count: Option<usize>,
85
86    /// Data types for the full row.
87    ///
88    /// For debug formatting only.
89    data_types: Vec<DataType>,
90}
91
92impl<const WITH_TIES: bool> EstimateSize for TopNCache<WITH_TIES> {
93    fn estimated_heap_size(&self) -> usize {
94        self.low.estimated_heap_size()
95            + self.middle.estimated_heap_size()
96            + self.high.estimated_heap_size()
97    }
98}
99
100impl<const WITH_TIES: bool> Debug for TopNCache<WITH_TIES> {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        write!(
103            f,
104            "TopNCache {{\n  offset: {}, limit: {}, high_cache_capacity: {},\n",
105            self.offset, self.limit, self.high_cache_capacity
106        )?;
107
108        fn format_cache(
109            f: &mut std::fmt::Formatter<'_>,
110            cache: &Cache,
111            data_types: &[DataType],
112        ) -> std::fmt::Result {
113            if cache.is_empty() {
114                return write!(f, "    <empty>");
115            }
116            write!(
117                f,
118                "    {}",
119                cache
120                    .iter()
121                    .format_with("\n    ", |item, f| f(&format_args!(
122                        "{:?}, {}",
123                        item.0,
124                        item.1.deserialize(data_types).unwrap().display(),
125                    )))
126            )
127        }
128
129        writeln!(f, "  low:")?;
130        if let Some(low) = &self.low {
131            format_cache(f, low, &self.data_types)?;
132        } else {
133            writeln!(f, "    <none>")?;
134        }
135        writeln!(f, "\n  middle:")?;
136        format_cache(f, &self.middle, &self.data_types)?;
137        writeln!(f, "\n  high:")?;
138        format_cache(f, &self.high, &self.data_types)?;
139
140        write!(f, "\n}}")?;
141        Ok(())
142    }
143}
144
145/// This trait is used as a bound. It is needed since
146/// `TopNCache::<true>::f` and `TopNCache::<false>::f`
147/// don't imply `TopNCache::<WITH_TIES>::f`.
148pub trait TopNCacheTrait {
149    /// Insert input row to corresponding cache range according to its order key.
150    ///
151    /// Changes in `self.middle` is recorded to `res_ops` and `res_rows`, which will be
152    /// used to generate messages to be sent to downstream operators.
153    fn insert(&mut self, cache_key: CacheKey, row: impl Row + Send, staging: &mut TopNStaging);
154
155    /// Delete input row from the cache.
156    ///
157    /// Changes in `self.middle` is recorded to `res_ops` and `res_rows`, which will be
158    /// used to generate messages to be sent to downstream operators.
159    ///
160    /// Because we may need to refill data from the state table to `self.high` during the delete
161    /// operation, we need to pass in `group_key`, `epoch` and `managed_state` to do a prefix
162    /// scan of the state table.
163    fn delete<S: StateStore>(
164        &mut self,
165        group_key: Option<impl GroupKey>,
166        managed_state: &mut ManagedTopNState<S>,
167        cache_key: CacheKey,
168        row: impl Row + Send,
169        staging: &mut TopNStaging,
170    ) -> impl Future<Output = StreamExecutorResult<()>> + Send;
171}
172
173impl<const WITH_TIES: bool> TopNCache<WITH_TIES> {
174    /// `data_types` -- Data types for the full row.
175    /// `min_capacity` -- Minimum capacity for the high cache. When not provided, defaults to `TOPN_CACHE_MIN_CAPACITY`.
176    pub fn new(offset: usize, limit: usize, data_types: Vec<DataType>) -> Self {
177        Self::with_min_capacity(offset, limit, data_types, TOPN_CACHE_MIN_CAPACITY)
178    }
179
180    /// `data_types` -- Data types for the full row.
181    /// `min_capacity` -- Minimum capacity for the high cache.
182    pub fn with_min_capacity(
183        offset: usize,
184        limit: usize,
185        data_types: Vec<DataType>,
186        min_capacity: usize,
187    ) -> Self {
188        assert!(limit > 0);
189        if WITH_TIES {
190            // It's trickier to support.
191            // Also `OFFSET WITH TIES` has different semantic with `a < RANK() < b`
192            assert!(offset == 0, "OFFSET is not supported with WITH TIES");
193        }
194        let high_cache_capacity = offset
195            .checked_add(limit)
196            .and_then(|v| v.checked_mul(TOPN_CACHE_HIGH_CAPACITY_FACTOR))
197            .unwrap_or(usize::MAX)
198            .max(min_capacity);
199        Self {
200            low: if offset > 0 { Some(Cache::new()) } else { None },
201            middle: Cache::new(),
202            high: Cache::new(),
203            high_cache_capacity,
204            offset,
205            limit,
206            table_row_count: None,
207            data_types,
208        }
209    }
210
211    /// Clear the cache. After this, the cache must be `init` again before use.
212    #[allow(dead_code)]
213    pub fn clear(&mut self) {
214        self.low.as_mut().map(Cache::clear);
215        self.middle.clear();
216        self.high.clear();
217    }
218
219    /// Get total count of entries in the cache.
220    pub fn len(&self) -> usize {
221        self.low.as_ref().map(Cache::len).unwrap_or(0) + self.middle.len() + self.high.len()
222    }
223
224    pub(super) fn update_table_row_count(&mut self, table_row_count: usize) {
225        self.table_row_count = Some(table_row_count)
226    }
227
228    pub fn low_is_full(&self) -> bool {
229        if let Some(low) = &self.low {
230            assert!(low.len() <= self.offset);
231            let full = low.len() == self.offset;
232            if !full {
233                assert!(self.middle.is_empty());
234                assert!(self.high.is_empty());
235            }
236            full
237        } else {
238            true
239        }
240    }
241
242    pub fn middle_is_full(&self) -> bool {
243        // For WITH_TIES, the middle cache can exceed the capacity.
244        if !WITH_TIES {
245            assert!(
246                self.middle.len() <= self.limit,
247                "the middle cache exceeds the capacity\n{self:?}"
248            );
249        }
250        let full = self.middle.len() >= self.limit;
251        if full {
252            assert!(self.low_is_full());
253        } else {
254            assert!(
255                self.high.is_empty(),
256                "the high cache is not empty when middle cache is not full:\n{self:?}"
257            );
258        }
259        full
260    }
261
262    pub fn high_is_full(&self) -> bool {
263        // For WITH_TIES, the high cache can exceed the capacity.
264        if !WITH_TIES {
265            assert!(self.high.len() <= self.high_cache_capacity);
266        }
267        self.high.len() >= self.high_cache_capacity
268    }
269
270    fn high_is_synced(&self) -> bool {
271        if !self.high.is_empty() {
272            true
273        } else {
274            // check if table row count matches
275            self.table_row_count
276                .map(|n| n == self.len())
277                .unwrap_or(false)
278        }
279    }
280
281    fn last_cache_key_before_high(&self) -> Option<&CacheKey> {
282        let middle_last_key = self.middle.last_key_value().map(|(k, _)| k);
283        middle_last_key.or_else(|| {
284            self.low
285                .as_ref()
286                .and_then(Cache::last_key_value)
287                .map(|(k, _)| k)
288        })
289    }
290}
291
292impl TopNCacheTrait for TopNCache<false> {
293    fn insert(&mut self, cache_key: CacheKey, row: impl Row + Send, staging: &mut TopNStaging) {
294        if let Some(row_count) = self.table_row_count.as_mut() {
295            *row_count += 1;
296        }
297
298        let mut to_insert = (cache_key, (&row).into());
299        let mut is_last_of_lower_cache = false; // for saving one key comparison
300
301        let low_is_full = self.low_is_full();
302        if let Some(low) = &mut self.low {
303            // try insert into low cache
304
305            if !low_is_full {
306                low.insert(to_insert.0, to_insert.1);
307                return;
308            }
309
310            // low cache is full
311            let low_last = low.last_entry().unwrap();
312            if &to_insert.0 < low_last.key() {
313                // make space for the new entry
314                let low_last = low_last.remove_entry();
315                low.insert(to_insert.0, to_insert.1);
316                to_insert = low_last; // move the last entry to the middle cache
317                is_last_of_lower_cache = true;
318            }
319        }
320
321        // try insert into middle cache
322
323        if !self.middle_is_full() {
324            self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
325            staging.insert(to_insert.0, to_insert.1);
326            return;
327        }
328
329        // middle cache is full
330        let middle_last = self.middle.last_entry().unwrap();
331        if is_last_of_lower_cache || &to_insert.0 < middle_last.key() {
332            // make space for the new entry
333            let middle_last = middle_last.remove_entry();
334            self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
335
336            staging.delete(middle_last.0.clone(), middle_last.1.clone());
337            staging.insert(to_insert.0, to_insert.1);
338
339            to_insert = middle_last; // move the last entry to the high cache
340            is_last_of_lower_cache = true;
341        }
342
343        // try insert into high cache
344
345        // The logic is a bit different from the other two caches, because high cache is not
346        // guaranteed to be fully synced with the "high part" of the table.
347
348        if is_last_of_lower_cache || self.high_is_synced() {
349            // For `is_last_of_lower_cache`, an obvious observation is that the key to insert is
350            // always smaller than any key in the high part of the table.
351
352            if self.high.is_empty() {
353                // if high cache is empty, we can insert directly
354                self.high.insert(to_insert.0, to_insert.1);
355                return;
356            }
357
358            let high_is_full = self.high_is_full();
359            let high_last = self.high.last_entry().unwrap();
360
361            if is_last_of_lower_cache || &to_insert.0 < high_last.key() {
362                // we can only insert if the key is smaller than the largest key in the high cache
363                if high_is_full {
364                    // make space for the new entry
365                    high_last.remove_entry();
366                }
367                self.high.insert(to_insert.0, to_insert.1);
368            }
369        }
370    }
371
372    async fn delete<S: StateStore>(
373        &mut self,
374        group_key: Option<impl GroupKey>,
375        managed_state: &mut ManagedTopNState<S>,
376        cache_key: CacheKey,
377        row: impl Row + Send,
378        staging: &mut TopNStaging,
379    ) -> StreamExecutorResult<()> {
380        if !enable_strict_consistency() && self.table_row_count == Some(0) {
381            // If strict consistency is disabled, and we receive a `DELETE` but the row count is 0, we
382            // should not panic. Instead, we pretend that we don't know about the actually row count.
383            consistency_error!("table row count is 0, but we receive a DELETE operation");
384            self.table_row_count = None;
385        }
386        if let Some(row_count) = self.table_row_count.as_mut() {
387            *row_count -= 1;
388        }
389
390        if self.middle_is_full() && &cache_key > self.middle.last_key_value().unwrap().0 {
391            // the row is in high
392            self.high.remove(&cache_key);
393        } else if self.low_is_full()
394            && self
395                .low
396                .as_ref()
397                .map(|low| &cache_key > low.last_key_value().unwrap().0)
398                .unwrap_or(
399                    true, // if low is None, `cache_key` should be in middle
400                )
401        {
402            // the row is in middle
403            let removed = self.middle.remove(&cache_key);
404            staging.delete(cache_key.clone(), (&row).into());
405
406            if removed.is_none() {
407                // the middle cache should always be synced, if the key is not found, then it also doesn't
408                // exist in the state table
409                consistency_error!(
410                    ?group_key,
411                    ?cache_key,
412                    "cache key not found in middle cache"
413                );
414                return Ok(());
415            }
416
417            // refill the high cache if it's not synced
418            if !self.high_is_synced() {
419                self.high.clear();
420                managed_state
421                    .fill_high_cache(
422                        group_key,
423                        self,
424                        self.last_cache_key_before_high().cloned(),
425                        self.high_cache_capacity,
426                    )
427                    .await?;
428            }
429
430            // bring one element, if any, from high cache to middle cache
431            if !self.high.is_empty() {
432                let high_first = self.high.pop_first().unwrap();
433                self.middle
434                    .insert(high_first.0.clone(), high_first.1.clone());
435                staging.insert(high_first.0, high_first.1);
436            }
437
438            assert!(self.high.is_empty() || self.middle.len() == self.limit);
439        } else {
440            // the row is in low
441            let low = self.low.as_mut().unwrap();
442            let removed = low.remove(&cache_key);
443
444            if removed.is_none() {
445                // the low cache should always be synced, if the key is not found, then it also doesn't
446                // exist in the state table
447                consistency_error!(?group_key, ?cache_key, "cache key not found in low cache");
448                return Ok(());
449            }
450
451            // bring one element, if any, from middle cache to low cache
452            if !self.middle.is_empty() {
453                let middle_first = self.middle.pop_first().unwrap();
454                staging.delete(middle_first.0.clone(), middle_first.1.clone());
455                low.insert(middle_first.0, middle_first.1);
456
457                // fill the high cache if it's not synced
458                if !self.high_is_synced() {
459                    self.high.clear();
460                    managed_state
461                        .fill_high_cache(
462                            group_key,
463                            self,
464                            self.last_cache_key_before_high().cloned(),
465                            self.high_cache_capacity,
466                        )
467                        .await?;
468                }
469
470                // bring one element, if any, from high cache to middle cache
471                if !self.high.is_empty() {
472                    let high_first = self.high.pop_first().unwrap();
473                    self.middle
474                        .insert(high_first.0.clone(), high_first.1.clone());
475                    staging.insert(high_first.0, high_first.1);
476                }
477            }
478        }
479
480        Ok(())
481    }
482}
483
484impl TopNCacheTrait for TopNCache<true> {
485    fn insert(&mut self, cache_key: CacheKey, row: impl Row + Send, staging: &mut TopNStaging) {
486        if let Some(row_count) = self.table_row_count.as_mut() {
487            *row_count += 1;
488        }
489
490        assert!(
491            self.low.is_none(),
492            "Offset is not supported yet for WITH TIES, so low cache should be None"
493        );
494
495        let to_insert: (CacheKey, CompactedRow) = (cache_key, (&row).into());
496
497        // try insert into middle cache
498
499        if !self.middle_is_full() {
500            self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
501            staging.insert(to_insert.0.clone(), to_insert.1);
502            return;
503        }
504
505        // middle cache is full
506
507        let to_insert_sort_key = &(to_insert.0).0;
508        let middle_last_sort_key = self.middle.last_key().unwrap().0.clone();
509
510        match to_insert_sort_key.cmp(&middle_last_sort_key) {
511            Ordering::Less => {
512                // the row is in middle
513                let n_ties_of_last = self
514                    .middle
515                    .range((middle_last_sort_key.clone(), vec![])..)
516                    .count();
517                // We evict the last row and its ties only if the number of remaining rows still is
518                // still larger than limit, i.e., there are limit-1 other rows.
519                //
520                // e.g., limit = 3, [1,1,1,1]
521                // insert 0 -> [0,1,1,1,1]
522                // insert 0 -> [0,0,1,1,1,1]
523                // insert 0 -> [0,0,0]
524                if self.middle.len() + 1 - n_ties_of_last >= self.limit {
525                    // Middle will be full without the last element and its ties after insertion.
526                    // Let's move the last element and its ties to high cache first.
527                    while let Some(middle_last) = self.middle.last_entry()
528                        && middle_last.key().0 == middle_last_sort_key
529                    {
530                        let middle_last = middle_last.remove_entry();
531                        staging.delete(middle_last.0.clone(), middle_last.1.clone());
532                        // we can blindly move entries from middle cache to high cache no matter high cache is synced or not
533                        self.high.insert(middle_last.0, middle_last.1);
534                    }
535                }
536                if self.high.len() > self.high_cache_capacity {
537                    // evict some entries from high cache if it exceeds the capacity
538                    let high_last = self.high.pop_last().unwrap();
539                    let high_last_sort_key = (high_last.0).0;
540                    // Remove all ties of the last element in high cache, for the sake of simplicity.
541                    // This may cause repeatedly refill the high cache if number of ties is large.
542                    self.high.retain(|k, _| k.0 != high_last_sort_key);
543                }
544
545                self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
546                staging.insert(to_insert.0, to_insert.1);
547            }
548            Ordering::Equal => {
549                // the row is in middle and is a tie of the last row
550                self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
551                staging.insert(to_insert.0, to_insert.1);
552            }
553            Ordering::Greater => {
554                // the row is in high
555
556                if self.high_is_synced() {
557                    // only insert into high cache if it is synced
558
559                    if self.high.is_empty() {
560                        // if high cache is empty, we can insert directly
561                        self.high.insert(to_insert.0, to_insert.1);
562                        return;
563                    }
564
565                    if to_insert_sort_key <= &self.high.last_key().unwrap().0 {
566                        // We can only insert if the key is <= the largest key in the high cache.
567                        // Note that we have all ties of the last element in the high cache, so we can
568                        // safely compare only the sort key.
569                        self.high.insert(to_insert.0, to_insert.1);
570                    }
571
572                    if self.high.len() > self.high_cache_capacity {
573                        // evict some entries from high cache if it exceeds the capacity
574                        let high_last = self.high.pop_last().unwrap();
575                        let high_last_sort_key = (high_last.0).0;
576                        // Remove all ties of the last element in high cache, for the sake of simplicity.
577                        // This may cause repeatedly refill the high cache if number of ties is large.
578                        self.high.retain(|k, _| k.0 != high_last_sort_key);
579                    }
580                }
581            }
582        }
583    }
584
585    async fn delete<S: StateStore>(
586        &mut self,
587        group_key: Option<impl GroupKey>,
588        managed_state: &mut ManagedTopNState<S>,
589        cache_key: CacheKey,
590        row: impl Row + Send,
591        staging: &mut TopNStaging,
592    ) -> StreamExecutorResult<()> {
593        if !enable_strict_consistency() && self.table_row_count == Some(0) {
594            // If strict consistency is disabled, and we receive a `DELETE` but the row count is 0, we
595            // should not panic. Instead, we pretend that we don't know about the actually row count.
596            self.table_row_count = None;
597        }
598        if let Some(row_count) = self.table_row_count.as_mut() {
599            *row_count -= 1;
600        }
601
602        assert!(
603            self.low.is_none(),
604            "Offset is not supported yet for WITH TIES, so low cache should be None"
605        );
606
607        if self.middle.is_empty() {
608            consistency_error!(
609                ?group_key,
610                ?cache_key,
611                "middle cache is empty, but we receive a DELETE operation"
612            );
613            staging.delete(cache_key, (&row).into());
614            return Ok(());
615        }
616
617        let middle_last_sort_key = self.middle.last_key().unwrap().0.clone();
618
619        let to_delete_sort_key = cache_key.0.clone();
620        if to_delete_sort_key > middle_last_sort_key {
621            // the row is in high
622            self.high.remove(&cache_key);
623        } else {
624            // the row is in middle
625            self.middle.remove(&cache_key);
626            staging.delete(cache_key.clone(), (&row).into());
627            if self.middle.len() >= self.limit {
628                // this can happen when there are ties
629                return Ok(());
630            }
631
632            // refill the high cache if it's not synced
633            if !self.high_is_synced() {
634                managed_state
635                    .fill_high_cache(
636                        group_key,
637                        self,
638                        self.last_cache_key_before_high().cloned(),
639                        self.high_cache_capacity,
640                    )
641                    .await?;
642            }
643
644            // bring the first element and its ties, if any, from high cache to middle cache
645            if !self.high.is_empty() {
646                let high_first = self.high.pop_first().unwrap();
647                let high_first_sort_key = (high_first.0).0.clone();
648                assert!(high_first_sort_key > middle_last_sort_key);
649
650                self.middle
651                    .insert(high_first.0.clone(), high_first.1.clone());
652                staging.insert(high_first.0, high_first.1);
653
654                for (cache_key, row) in self.high.extract_if(|k, _| k.0 == high_first_sort_key) {
655                    self.middle.insert(cache_key.clone(), row.clone());
656                    staging.insert(cache_key, row);
657                }
658            }
659        }
660
661        Ok(())
662    }
663}
664
665/// Similar to [`TopNCacheTrait`], but for append-only TopN.
666pub trait AppendOnlyTopNCacheTrait {
667    /// Insert input row to corresponding cache range according to its order key.
668    ///
669    /// Changes in `self.middle` is recorded to `res_ops` and `res_rows`, which will be
670    /// used to generate messages to be sent to downstream operators.
671    ///
672    /// `managed_state` is required because different from normal TopN, append-only TopN
673    /// doesn't insert all rows into the state table.
674    fn insert<S: StateStore>(
675        &mut self,
676        cache_key: CacheKey,
677        row_ref: RowRef<'_>,
678        staging: &mut TopNStaging,
679        managed_state: &mut ManagedTopNState<S>,
680        row_deserializer: &RowDeserializer,
681    ) -> StreamExecutorResult<()>;
682}
683
684impl AppendOnlyTopNCacheTrait for TopNCache<false> {
685    fn insert<S: StateStore>(
686        &mut self,
687        cache_key: CacheKey,
688        row_ref: RowRef<'_>,
689        staging: &mut TopNStaging,
690        managed_state: &mut ManagedTopNState<S>,
691        row_deserializer: &RowDeserializer,
692    ) -> StreamExecutorResult<()> {
693        if self.middle_is_full() && &cache_key >= self.middle.last_key().unwrap() {
694            return Ok(());
695        }
696        managed_state.insert(row_ref);
697
698        // insert input row into corresponding cache according to its sort key
699        let mut to_insert = (cache_key, row_ref.into());
700
701        let low_is_full = self.low_is_full();
702        if let Some(low) = &mut self.low {
703            // try insert into low cache
704
705            if !low_is_full {
706                low.insert(to_insert.0, to_insert.1);
707                return Ok(());
708            }
709
710            // low cache is full
711            let low_last = low.last_entry().unwrap();
712            if &to_insert.0 < low_last.key() {
713                // make space for the new entry
714                let low_last = low_last.remove_entry();
715                low.insert(to_insert.0, to_insert.1);
716                to_insert = low_last; // move the last entry to the middle cache
717            }
718        }
719
720        // try insert into middle cache
721
722        if !self.middle_is_full() {
723            self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
724            staging.insert(to_insert.0, to_insert.1);
725            return Ok(());
726        }
727
728        // The row must be in the range of [offset, offset+limit).
729        // the largest row in `cache.middle` needs to be removed.
730        let middle_last = self.middle.pop_last().unwrap();
731        debug_assert!(to_insert.0 < middle_last.0);
732        managed_state.delete(row_deserializer.deserialize(middle_last.1.row.as_ref())?);
733        staging.delete(middle_last.0, middle_last.1);
734
735        self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
736        staging.insert(to_insert.0, to_insert.1);
737
738        // Unlike normal topN, append only topN does not use the high part of the cache.
739
740        Ok(())
741    }
742}
743
744impl AppendOnlyTopNCacheTrait for TopNCache<true> {
745    fn insert<S: StateStore>(
746        &mut self,
747        cache_key: CacheKey,
748        row_ref: RowRef<'_>,
749        staging: &mut TopNStaging,
750        managed_state: &mut ManagedTopNState<S>,
751        row_deserializer: &RowDeserializer,
752    ) -> StreamExecutorResult<()> {
753        assert!(
754            self.low.is_none(),
755            "Offset is not supported yet for WITH TIES, so low cache should be empty"
756        );
757
758        let to_insert = (cache_key, row_ref);
759
760        // try insert into middle cache
761
762        if !self.middle_is_full() {
763            managed_state.insert(to_insert.1);
764            let row: CompactedRow = to_insert.1.into();
765            self.middle.insert(to_insert.0.clone(), row.clone());
766            staging.insert(to_insert.0, row);
767            return Ok(());
768        }
769
770        // middle cache is full
771
772        let to_insert_sort_key = &(to_insert.0).0;
773        let middle_last_sort_key = self.middle.last_key().unwrap().0.clone();
774
775        match to_insert_sort_key.cmp(&middle_last_sort_key) {
776            Ordering::Less => {
777                // the row is in middle
778                let n_ties_of_last = self
779                    .middle
780                    .range((middle_last_sort_key.clone(), vec![])..)
781                    .count();
782                // We evict the last row and its ties only if the number of remaining rows is
783                // still larger than limit, i.e., there are limit-1 other rows.
784                //
785                // e.g., limit = 3, [1,1,1,1]
786                // insert 0 -> [0,1,1,1,1]
787                // insert 0 -> [0,0,1,1,1,1]
788                // insert 0 -> [0,0,0]
789                if self.middle.len() + 1 - n_ties_of_last >= self.limit {
790                    // middle will be full without the last element and its ties after insertion
791                    while let Some(middle_last) = self.middle.last_entry()
792                        && middle_last.key().0 == middle_last_sort_key
793                    {
794                        let middle_last = middle_last.remove_entry();
795                        // we don't need to maintain the high part so just delete it from state table
796                        managed_state
797                            .delete(row_deserializer.deserialize(middle_last.1.row.as_ref())?);
798                        staging.delete(middle_last.0, middle_last.1);
799                    }
800                }
801
802                managed_state.insert(to_insert.1);
803                let row: CompactedRow = to_insert.1.into();
804                self.middle.insert(to_insert.0.clone(), row.clone());
805                staging.insert(to_insert.0, row);
806            }
807            Ordering::Equal => {
808                // the row is in middle and is a tie of the last row
809                managed_state.insert(to_insert.1);
810                let row: CompactedRow = to_insert.1.into();
811                self.middle.insert(to_insert.0.clone(), row.clone());
812                staging.insert(to_insert.0, row);
813            }
814            Ordering::Greater => {
815                // the row is in high, do nothing
816            }
817        }
818
819        Ok(())
820    }
821}
822
823/// Used to build diff between before and after applying an input chunk, for `TopNCache` (of one group).
824/// It should be maintained when an entry is inserted or deleted from the `middle` cache.
825#[derive(Debug, Default)]
826pub struct TopNStaging {
827    to_delete: BTreeMap<CacheKey, CompactedRow>,
828    to_insert: BTreeMap<CacheKey, CompactedRow>,
829    to_update: BTreeMap<CacheKey, (CompactedRow, CompactedRow)>,
830}
831
832impl TopNStaging {
833    pub fn new() -> Self {
834        Self::default()
835    }
836
837    /// Insert a row into the staging changes. This method must be called when a row is
838    /// added to the `middle` cache.
839    fn insert(&mut self, cache_key: CacheKey, row: CompactedRow) {
840        if let Some(old_row) = self.to_delete.remove(&cache_key) {
841            if old_row != row {
842                self.to_update.insert(cache_key, (old_row, row));
843            }
844        } else {
845            self.to_insert.insert(cache_key, row);
846        }
847    }
848
849    /// Delete a row from the staging changes. This method must be called when a row is
850    /// removed from the `middle` cache.
851    fn delete(&mut self, cache_key: CacheKey, row: CompactedRow) {
852        if self.to_insert.remove(&cache_key).is_some() {
853            // do nothing more
854        } else if let Some((old_row, _)) = self.to_update.remove(&cache_key) {
855            self.to_delete.insert(cache_key, old_row);
856        } else {
857            self.to_delete.insert(cache_key, row);
858        }
859    }
860
861    /// Get the count of effective changes in the staging.
862    pub fn len(&self) -> usize {
863        self.to_delete.len() + self.to_insert.len() + self.to_update.len()
864    }
865
866    /// Check if the staging is empty.
867    pub fn is_empty(&self) -> bool {
868        self.to_delete.is_empty() && self.to_insert.is_empty() && self.to_update.is_empty()
869    }
870
871    /// Iterate over the changes in the staging.
872    pub fn into_changes(self) -> impl Iterator<Item = (Op, CompactedRow)> {
873        #[cfg(debug_assertions)]
874        {
875            let keys = self
876                .to_delete
877                .keys()
878                .chain(self.to_insert.keys())
879                .chain(self.to_update.keys())
880                .unique()
881                .count();
882            assert_eq!(
883                keys,
884                self.to_delete.len() + self.to_insert.len() + self.to_update.len(),
885                "should not have duplicate keys with different operations",
886            );
887        }
888
889        // We expect one `CacheKey` to appear at most once in the staging, and, the order of
890        // the outputs of `TopN` doesn't really matter, so we can simply chain the three maps.
891        // Although the output order is not important, we still ensure that `Delete`s are emitted
892        // before `Insert`s, so that we can avoid temporary violation of the `LIMIT` constraint.
893        self.to_update
894            .into_values()
895            .flat_map(|(old_row, new_row)| {
896                [(Op::UpdateDelete, old_row), (Op::UpdateInsert, new_row)]
897            })
898            .chain(self.to_delete.into_values().map(|row| (Op::Delete, row)))
899            .chain(self.to_insert.into_values().map(|row| (Op::Insert, row)))
900    }
901
902    /// Iterate over the changes in the staging, and deserialize the rows.
903    pub fn into_deserialized_changes(
904        self,
905        deserializer: &RowDeserializer,
906    ) -> impl Iterator<Item = StreamExecutorResult<(Op, OwnedRow)>> + '_ {
907        self.into_changes()
908            .map(|(op, row)| Ok((op, deserializer.deserialize(row.row.as_ref())?)))
909    }
910}
911
912#[cfg(test)]
913mod tests {
914    use risingwave_common::types::DataType;
915
916    use super::*;
917
918    #[test]
919    fn test_topn_cache_new_uses_default_min_capacity() {
920        let cache = TopNCache::<false>::new(0, 5, vec![DataType::Int32]);
921        assert_eq!(cache.high_cache_capacity, TOPN_CACHE_MIN_CAPACITY);
922    }
923
924    #[test]
925    fn test_topn_cache_with_custom_min_capacity() {
926        let custom_min_capacity = 25;
927        let cache =
928            TopNCache::<false>::with_min_capacity(0, 5, vec![DataType::Int32], custom_min_capacity);
929        assert_eq!(cache.high_cache_capacity, custom_min_capacity);
930    }
931
932    #[test]
933    fn test_topn_cache_high_capacity_factor_respected() {
934        let custom_min_capacity = 1;
935        let offset = 2;
936        let limit = 5;
937        let expected_capacity = (offset + limit) * TOPN_CACHE_HIGH_CAPACITY_FACTOR;
938
939        let cache = TopNCache::<false>::with_min_capacity(
940            offset,
941            limit,
942            vec![DataType::Int32],
943            custom_min_capacity,
944        );
945        assert_eq!(cache.high_cache_capacity, expected_capacity);
946    }
947
948    #[test]
949    fn test_topn_cache_min_capacity_takes_precedence_when_larger() {
950        let large_min_capacity = 100;
951        let offset = 0;
952        let limit = 5;
953        let expected_from_formula = (offset + limit) * TOPN_CACHE_HIGH_CAPACITY_FACTOR; // Should be 10
954
955        let cache = TopNCache::<false>::with_min_capacity(
956            offset,
957            limit,
958            vec![DataType::Int32],
959            large_min_capacity,
960        );
961        assert_eq!(cache.high_cache_capacity, large_min_capacity);
962        assert!(cache.high_cache_capacity > expected_from_formula);
963    }
964}