risingwave_stream/executor/top_n/
top_n_cache.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::BTreeMap;
17use std::fmt::Debug;
18use std::future::Future;
19
20use itertools::Itertools;
21use risingwave_common::array::{Op, RowRef};
22use risingwave_common::row::{CompactedRow, OwnedRow, Row, RowDeserializer, RowExt};
23use risingwave_common::types::DataType;
24use risingwave_common_estimate_size::EstimateSize;
25use risingwave_common_estimate_size::collections::EstimatedBTreeMap;
26use risingwave_storage::StateStore;
27
28use super::{GroupKey, ManagedTopNState};
29use crate::consistency::{consistency_error, enable_strict_consistency};
30use crate::executor::error::StreamExecutorResult;
31
32/// `CacheKey` is composed of `(order_by, remaining columns of pk)`.
33pub type CacheKey = (Vec<u8>, Vec<u8>);
34pub type Cache = EstimatedBTreeMap<CacheKey, CompactedRow>;
35
36const TOPN_CACHE_HIGH_CAPACITY_FACTOR: usize = 2;
37const TOPN_CACHE_MIN_CAPACITY: usize = 10;
38
39/// Cache for [`ManagedTopNState`].
40///
41/// The key in the maps [`CacheKey`] is `[ order_by + remaining columns of pk ]`. `group_key` is not
42/// included.
43///
44/// # `WITH_TIES`
45///
46/// `WITH_TIES` supports the semantic of `FETCH FIRST n ROWS WITH TIES` and `RANK() <= n`.
47///
48/// `OFFSET m FETCH FIRST n ROWS WITH TIES` and `m <= RANK() <= n` are not supported now,
49/// since they have different semantics.
50pub struct TopNCache<const WITH_TIES: bool> {
51    /// Rows in the range `[0, offset)`. Should always be synced with state table.
52    pub low: Option<Cache>,
53
54    /// Rows in the range `[offset, offset+limit)`. Should always be synced with state table.
55    ///
56    /// When `WITH_TIES` is true, it also stores ties for the last element,
57    /// and thus the size can be larger than `limit`.
58    pub middle: Cache,
59
60    /// Cache of the beginning rows in the range `[offset+limit, ...)`.
61    ///
62    /// This is very similar to [`TopNStateCache`], which only caches the top-N rows in the table
63    /// and only accepts new records that are less than the largest in the cache.
64    ///
65    /// When `WITH_TIES` is true, it guarantees that the ties of the last element are in the cache,
66    /// and thus the size can be larger than `rest_cache_capacity`.
67    ///
68    /// When the cache becomes empty, if the `table_row_count` is not matched, we need to view the cache
69    /// as unsynced and refill it from the state table.
70    ///
71    /// TODO(rc): later we should reuse [`TopNStateCache`] here.
72    ///
73    /// [`TopNStateCache`]: crate::common::state_cache::TopNStateCache
74    pub high: Cache,
75    pub high_cache_capacity: usize,
76
77    pub offset: usize,
78    /// Assumption: `limit != 0`
79    pub limit: usize,
80
81    /// Number of rows corresponding to the current group.
82    /// This is a nice-to-have information. `None` means we don't know the row count,
83    /// but it doesn't prevent us from working correctly.
84    table_row_count: Option<usize>,
85
86    /// Data types for the full row.
87    ///
88    /// For debug formatting only.
89    data_types: Vec<DataType>,
90}
91
92impl<const WITH_TIES: bool> EstimateSize for TopNCache<WITH_TIES> {
93    fn estimated_heap_size(&self) -> usize {
94        self.low.estimated_heap_size()
95            + self.middle.estimated_heap_size()
96            + self.high.estimated_heap_size()
97    }
98}
99
100impl<const WITH_TIES: bool> Debug for TopNCache<WITH_TIES> {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        write!(
103            f,
104            "TopNCache {{\n  offset: {}, limit: {}, high_cache_capacity: {},\n",
105            self.offset, self.limit, self.high_cache_capacity
106        )?;
107
108        fn format_cache(
109            f: &mut std::fmt::Formatter<'_>,
110            cache: &Cache,
111            data_types: &[DataType],
112        ) -> std::fmt::Result {
113            if cache.is_empty() {
114                return write!(f, "    <empty>");
115            }
116            write!(
117                f,
118                "    {}",
119                cache
120                    .iter()
121                    .format_with("\n    ", |item, f| f(&format_args!(
122                        "{:?}, {}",
123                        item.0,
124                        item.1.deserialize(data_types).unwrap().display(),
125                    )))
126            )
127        }
128
129        writeln!(f, "  low:")?;
130        if let Some(low) = &self.low {
131            format_cache(f, low, &self.data_types)?;
132        } else {
133            writeln!(f, "    <none>")?;
134        }
135        writeln!(f, "\n  middle:")?;
136        format_cache(f, &self.middle, &self.data_types)?;
137        writeln!(f, "\n  high:")?;
138        format_cache(f, &self.high, &self.data_types)?;
139
140        write!(f, "\n}}")?;
141        Ok(())
142    }
143}
144
145/// This trait is used as a bound. It is needed since
146/// `TopNCache::<true>::f` and `TopNCache::<false>::f`
147/// don't imply `TopNCache::<WITH_TIES>::f`.
148pub trait TopNCacheTrait {
149    /// Insert input row to corresponding cache range according to its order key.
150    ///
151    /// Changes in `self.middle` is recorded to `res_ops` and `res_rows`, which will be
152    /// used to generate messages to be sent to downstream operators.
153    fn insert(&mut self, cache_key: CacheKey, row: impl Row + Send, staging: &mut TopNStaging);
154
155    /// Delete input row from the cache.
156    ///
157    /// Changes in `self.middle` is recorded to `res_ops` and `res_rows`, which will be
158    /// used to generate messages to be sent to downstream operators.
159    ///
160    /// Because we may need to refill data from the state table to `self.high` during the delete
161    /// operation, we need to pass in `group_key`, `epoch` and `managed_state` to do a prefix
162    /// scan of the state table.
163    fn delete<S: StateStore>(
164        &mut self,
165        group_key: Option<impl GroupKey>,
166        managed_state: &mut ManagedTopNState<S>,
167        cache_key: CacheKey,
168        row: impl Row + Send,
169        staging: &mut TopNStaging,
170    ) -> impl Future<Output = StreamExecutorResult<()>> + Send;
171}
172
173impl<const WITH_TIES: bool> TopNCache<WITH_TIES> {
174    /// `data_types` -- Data types for the full row.
175    pub fn new(offset: usize, limit: usize, data_types: Vec<DataType>) -> Self {
176        assert!(limit > 0);
177        if WITH_TIES {
178            // It's trickier to support.
179            // Also `OFFSET WITH TIES` has different semantic with `a < RANK() < b`
180            assert!(offset == 0, "OFFSET is not supported with WITH TIES");
181        }
182        let high_cache_capacity = offset
183            .checked_add(limit)
184            .and_then(|v| v.checked_mul(TOPN_CACHE_HIGH_CAPACITY_FACTOR))
185            .unwrap_or(usize::MAX)
186            .max(TOPN_CACHE_MIN_CAPACITY);
187        Self {
188            low: if offset > 0 { Some(Cache::new()) } else { None },
189            middle: Cache::new(),
190            high: Cache::new(),
191            high_cache_capacity,
192            offset,
193            limit,
194            table_row_count: None,
195            data_types,
196        }
197    }
198
199    /// Clear the cache. After this, the cache must be `init` again before use.
200    #[allow(dead_code)]
201    pub fn clear(&mut self) {
202        self.low.as_mut().map(Cache::clear);
203        self.middle.clear();
204        self.high.clear();
205    }
206
207    /// Get total count of entries in the cache.
208    pub fn len(&self) -> usize {
209        self.low.as_ref().map(Cache::len).unwrap_or(0) + self.middle.len() + self.high.len()
210    }
211
212    pub(super) fn update_table_row_count(&mut self, table_row_count: usize) {
213        self.table_row_count = Some(table_row_count)
214    }
215
216    pub fn low_is_full(&self) -> bool {
217        if let Some(low) = &self.low {
218            assert!(low.len() <= self.offset);
219            let full = low.len() == self.offset;
220            if !full {
221                assert!(self.middle.is_empty());
222                assert!(self.high.is_empty());
223            }
224            full
225        } else {
226            true
227        }
228    }
229
230    pub fn middle_is_full(&self) -> bool {
231        // For WITH_TIES, the middle cache can exceed the capacity.
232        if !WITH_TIES {
233            assert!(
234                self.middle.len() <= self.limit,
235                "the middle cache exceeds the capacity\n{self:?}"
236            );
237        }
238        let full = self.middle.len() >= self.limit;
239        if full {
240            assert!(self.low_is_full());
241        } else {
242            assert!(
243                self.high.is_empty(),
244                "the high cache is not empty when middle cache is not full:\n{self:?}"
245            );
246        }
247        full
248    }
249
250    pub fn high_is_full(&self) -> bool {
251        // For WITH_TIES, the high cache can exceed the capacity.
252        if !WITH_TIES {
253            assert!(self.high.len() <= self.high_cache_capacity);
254        }
255        self.high.len() >= self.high_cache_capacity
256    }
257
258    fn high_is_synced(&self) -> bool {
259        if !self.high.is_empty() {
260            true
261        } else {
262            // check if table row count matches
263            self.table_row_count
264                .map(|n| n == self.len())
265                .unwrap_or(false)
266        }
267    }
268
269    fn last_cache_key_before_high(&self) -> Option<&CacheKey> {
270        let middle_last_key = self.middle.last_key_value().map(|(k, _)| k);
271        middle_last_key.or_else(|| {
272            self.low
273                .as_ref()
274                .and_then(Cache::last_key_value)
275                .map(|(k, _)| k)
276        })
277    }
278}
279
280impl TopNCacheTrait for TopNCache<false> {
281    fn insert(&mut self, cache_key: CacheKey, row: impl Row + Send, staging: &mut TopNStaging) {
282        if let Some(row_count) = self.table_row_count.as_mut() {
283            *row_count += 1;
284        }
285
286        let mut to_insert = (cache_key, (&row).into());
287        let mut is_last_of_lower_cache = false; // for saving one key comparison
288
289        let low_is_full = self.low_is_full();
290        if let Some(low) = &mut self.low {
291            // try insert into low cache
292
293            if !low_is_full {
294                low.insert(to_insert.0, to_insert.1);
295                return;
296            }
297
298            // low cache is full
299            let low_last = low.last_entry().unwrap();
300            if &to_insert.0 < low_last.key() {
301                // make space for the new entry
302                let low_last = low_last.remove_entry();
303                low.insert(to_insert.0, to_insert.1);
304                to_insert = low_last; // move the last entry to the middle cache
305                is_last_of_lower_cache = true;
306            }
307        }
308
309        // try insert into middle cache
310
311        if !self.middle_is_full() {
312            self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
313            staging.insert(to_insert.0, to_insert.1);
314            return;
315        }
316
317        // middle cache is full
318        let middle_last = self.middle.last_entry().unwrap();
319        if is_last_of_lower_cache || &to_insert.0 < middle_last.key() {
320            // make space for the new entry
321            let middle_last = middle_last.remove_entry();
322            self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
323
324            staging.delete(middle_last.0.clone(), middle_last.1.clone());
325            staging.insert(to_insert.0, to_insert.1);
326
327            to_insert = middle_last; // move the last entry to the high cache
328            is_last_of_lower_cache = true;
329        }
330
331        // try insert into high cache
332
333        // The logic is a bit different from the other two caches, because high cache is not
334        // guaranteed to be fully synced with the "high part" of the table.
335
336        if is_last_of_lower_cache || self.high_is_synced() {
337            // For `is_last_of_lower_cache`, an obvious observation is that the key to insert is
338            // always smaller than any key in the high part of the table.
339
340            if self.high.is_empty() {
341                // if high cache is empty, we can insert directly
342                self.high.insert(to_insert.0, to_insert.1);
343                return;
344            }
345
346            let high_is_full = self.high_is_full();
347            let high_last = self.high.last_entry().unwrap();
348
349            if is_last_of_lower_cache || &to_insert.0 < high_last.key() {
350                // we can only insert if the key is smaller than the largest key in the high cache
351                if high_is_full {
352                    // make space for the new entry
353                    high_last.remove_entry();
354                }
355                self.high.insert(to_insert.0, to_insert.1);
356            }
357        }
358    }
359
360    async fn delete<S: StateStore>(
361        &mut self,
362        group_key: Option<impl GroupKey>,
363        managed_state: &mut ManagedTopNState<S>,
364        cache_key: CacheKey,
365        row: impl Row + Send,
366        staging: &mut TopNStaging,
367    ) -> StreamExecutorResult<()> {
368        if !enable_strict_consistency() && self.table_row_count == Some(0) {
369            // If strict consistency is disabled, and we receive a `DELETE` but the row count is 0, we
370            // should not panic. Instead, we pretend that we don't know about the actually row count.
371            consistency_error!("table row count is 0, but we receive a DELETE operation");
372            self.table_row_count = None;
373        }
374        if let Some(row_count) = self.table_row_count.as_mut() {
375            *row_count -= 1;
376        }
377
378        if self.middle_is_full() && &cache_key > self.middle.last_key_value().unwrap().0 {
379            // the row is in high
380            self.high.remove(&cache_key);
381        } else if self.low_is_full()
382            && self
383                .low
384                .as_ref()
385                .map(|low| &cache_key > low.last_key_value().unwrap().0)
386                .unwrap_or(
387                    true, // if low is None, `cache_key` should be in middle
388                )
389        {
390            // the row is in middle
391            let removed = self.middle.remove(&cache_key);
392            staging.delete(cache_key.clone(), (&row).into());
393
394            if removed.is_none() {
395                // the middle cache should always be synced, if the key is not found, then it also doesn't
396                // exist in the state table
397                consistency_error!(
398                    ?group_key,
399                    ?cache_key,
400                    "cache key not found in middle cache"
401                );
402                return Ok(());
403            }
404
405            // refill the high cache if it's not synced
406            if !self.high_is_synced() {
407                self.high.clear();
408                managed_state
409                    .fill_high_cache(
410                        group_key,
411                        self,
412                        self.last_cache_key_before_high().cloned(),
413                        self.high_cache_capacity,
414                    )
415                    .await?;
416            }
417
418            // bring one element, if any, from high cache to middle cache
419            if !self.high.is_empty() {
420                let high_first = self.high.pop_first().unwrap();
421                self.middle
422                    .insert(high_first.0.clone(), high_first.1.clone());
423                staging.insert(high_first.0, high_first.1);
424            }
425
426            assert!(self.high.is_empty() || self.middle.len() == self.limit);
427        } else {
428            // the row is in low
429            let low = self.low.as_mut().unwrap();
430            let removed = low.remove(&cache_key);
431
432            if removed.is_none() {
433                // the low cache should always be synced, if the key is not found, then it also doesn't
434                // exist in the state table
435                consistency_error!(?group_key, ?cache_key, "cache key not found in low cache");
436                return Ok(());
437            }
438
439            // bring one element, if any, from middle cache to low cache
440            if !self.middle.is_empty() {
441                let middle_first = self.middle.pop_first().unwrap();
442                staging.delete(middle_first.0.clone(), middle_first.1.clone());
443                low.insert(middle_first.0, middle_first.1);
444
445                // fill the high cache if it's not synced
446                if !self.high_is_synced() {
447                    self.high.clear();
448                    managed_state
449                        .fill_high_cache(
450                            group_key,
451                            self,
452                            self.last_cache_key_before_high().cloned(),
453                            self.high_cache_capacity,
454                        )
455                        .await?;
456                }
457
458                // bring one element, if any, from high cache to middle cache
459                if !self.high.is_empty() {
460                    let high_first = self.high.pop_first().unwrap();
461                    self.middle
462                        .insert(high_first.0.clone(), high_first.1.clone());
463                    staging.insert(high_first.0, high_first.1);
464                }
465            }
466        }
467
468        Ok(())
469    }
470}
471
472impl TopNCacheTrait for TopNCache<true> {
473    fn insert(&mut self, cache_key: CacheKey, row: impl Row + Send, staging: &mut TopNStaging) {
474        if let Some(row_count) = self.table_row_count.as_mut() {
475            *row_count += 1;
476        }
477
478        assert!(
479            self.low.is_none(),
480            "Offset is not supported yet for WITH TIES, so low cache should be None"
481        );
482
483        let to_insert: (CacheKey, CompactedRow) = (cache_key, (&row).into());
484
485        // try insert into middle cache
486
487        if !self.middle_is_full() {
488            self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
489            staging.insert(to_insert.0.clone(), to_insert.1);
490            return;
491        }
492
493        // middle cache is full
494
495        let to_insert_sort_key = &(to_insert.0).0;
496        let middle_last_sort_key = self.middle.last_key().unwrap().0.clone();
497
498        match to_insert_sort_key.cmp(&middle_last_sort_key) {
499            Ordering::Less => {
500                // the row is in middle
501                let n_ties_of_last = self
502                    .middle
503                    .range((middle_last_sort_key.clone(), vec![])..)
504                    .count();
505                // We evict the last row and its ties only if the number of remaining rows still is
506                // still larger than limit, i.e., there are limit-1 other rows.
507                //
508                // e.g., limit = 3, [1,1,1,1]
509                // insert 0 -> [0,1,1,1,1]
510                // insert 0 -> [0,0,1,1,1,1]
511                // insert 0 -> [0,0,0]
512                if self.middle.len() + 1 - n_ties_of_last >= self.limit {
513                    // Middle will be full without the last element and its ties after insertion.
514                    // Let's move the last element and its ties to high cache first.
515                    while let Some(middle_last) = self.middle.last_entry()
516                        && middle_last.key().0 == middle_last_sort_key
517                    {
518                        let middle_last = middle_last.remove_entry();
519                        staging.delete(middle_last.0.clone(), middle_last.1.clone());
520                        // we can blindly move entries from middle cache to high cache no matter high cache is synced or not
521                        self.high.insert(middle_last.0, middle_last.1);
522                    }
523                }
524                if self.high.len() > self.high_cache_capacity {
525                    // evict some entries from high cache if it exceeds the capacity
526                    let high_last = self.high.pop_last().unwrap();
527                    let high_last_sort_key = (high_last.0).0;
528                    // Remove all ties of the last element in high cache, for the sake of simplicity.
529                    // This may cause repeatedly refill the high cache if number of ties is large.
530                    self.high.retain(|k, _| k.0 != high_last_sort_key);
531                }
532
533                self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
534                staging.insert(to_insert.0, to_insert.1);
535            }
536            Ordering::Equal => {
537                // the row is in middle and is a tie of the last row
538                self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
539                staging.insert(to_insert.0, to_insert.1);
540            }
541            Ordering::Greater => {
542                // the row is in high
543
544                if self.high_is_synced() {
545                    // only insert into high cache if it is synced
546
547                    if self.high.is_empty() {
548                        // if high cache is empty, we can insert directly
549                        self.high.insert(to_insert.0, to_insert.1);
550                        return;
551                    }
552
553                    if to_insert_sort_key <= &self.high.last_key().unwrap().0 {
554                        // We can only insert if the key is <= the largest key in the high cache.
555                        // Note that we have all ties of the last element in the high cache, so we can
556                        // safely compare only the sort key.
557                        self.high.insert(to_insert.0, to_insert.1);
558                    }
559
560                    if self.high.len() > self.high_cache_capacity {
561                        // evict some entries from high cache if it exceeds the capacity
562                        let high_last = self.high.pop_last().unwrap();
563                        let high_last_sort_key = (high_last.0).0;
564                        // Remove all ties of the last element in high cache, for the sake of simplicity.
565                        // This may cause repeatedly refill the high cache if number of ties is large.
566                        self.high.retain(|k, _| k.0 != high_last_sort_key);
567                    }
568                }
569            }
570        }
571    }
572
573    async fn delete<S: StateStore>(
574        &mut self,
575        group_key: Option<impl GroupKey>,
576        managed_state: &mut ManagedTopNState<S>,
577        cache_key: CacheKey,
578        row: impl Row + Send,
579        staging: &mut TopNStaging,
580    ) -> StreamExecutorResult<()> {
581        if !enable_strict_consistency() && self.table_row_count == Some(0) {
582            // If strict consistency is disabled, and we receive a `DELETE` but the row count is 0, we
583            // should not panic. Instead, we pretend that we don't know about the actually row count.
584            self.table_row_count = None;
585        }
586        if let Some(row_count) = self.table_row_count.as_mut() {
587            *row_count -= 1;
588        }
589
590        assert!(
591            self.low.is_none(),
592            "Offset is not supported yet for WITH TIES, so low cache should be None"
593        );
594
595        if self.middle.is_empty() {
596            consistency_error!(
597                ?group_key,
598                ?cache_key,
599                "middle cache is empty, but we receive a DELETE operation"
600            );
601            staging.delete(cache_key, (&row).into());
602            return Ok(());
603        }
604
605        let middle_last_sort_key = self.middle.last_key().unwrap().0.clone();
606
607        let to_delete_sort_key = cache_key.0.clone();
608        if to_delete_sort_key > middle_last_sort_key {
609            // the row is in high
610            self.high.remove(&cache_key);
611        } else {
612            // the row is in middle
613            self.middle.remove(&cache_key);
614            staging.delete(cache_key.clone(), (&row).into());
615            if self.middle.len() >= self.limit {
616                // this can happen when there are ties
617                return Ok(());
618            }
619
620            // refill the high cache if it's not synced
621            if !self.high_is_synced() {
622                managed_state
623                    .fill_high_cache(
624                        group_key,
625                        self,
626                        self.last_cache_key_before_high().cloned(),
627                        self.high_cache_capacity,
628                    )
629                    .await?;
630            }
631
632            // bring the first element and its ties, if any, from high cache to middle cache
633            if !self.high.is_empty() {
634                let high_first = self.high.pop_first().unwrap();
635                let high_first_sort_key = (high_first.0).0.clone();
636                assert!(high_first_sort_key > middle_last_sort_key);
637
638                self.middle
639                    .insert(high_first.0.clone(), high_first.1.clone());
640                staging.insert(high_first.0, high_first.1);
641
642                for (cache_key, row) in self.high.extract_if(|k, _| k.0 == high_first_sort_key) {
643                    self.middle.insert(cache_key.clone(), row.clone());
644                    staging.insert(cache_key, row);
645                }
646            }
647        }
648
649        Ok(())
650    }
651}
652
653/// Similar to [`TopNCacheTrait`], but for append-only TopN.
654pub trait AppendOnlyTopNCacheTrait {
655    /// Insert input row to corresponding cache range according to its order key.
656    ///
657    /// Changes in `self.middle` is recorded to `res_ops` and `res_rows`, which will be
658    /// used to generate messages to be sent to downstream operators.
659    ///
660    /// `managed_state` is required because different from normal TopN, append-only TopN
661    /// doesn't insert all rows into the state table.
662    fn insert<S: StateStore>(
663        &mut self,
664        cache_key: CacheKey,
665        row_ref: RowRef<'_>,
666        staging: &mut TopNStaging,
667        managed_state: &mut ManagedTopNState<S>,
668        row_deserializer: &RowDeserializer,
669    ) -> StreamExecutorResult<()>;
670}
671
672impl AppendOnlyTopNCacheTrait for TopNCache<false> {
673    fn insert<S: StateStore>(
674        &mut self,
675        cache_key: CacheKey,
676        row_ref: RowRef<'_>,
677        staging: &mut TopNStaging,
678        managed_state: &mut ManagedTopNState<S>,
679        row_deserializer: &RowDeserializer,
680    ) -> StreamExecutorResult<()> {
681        if self.middle_is_full() && &cache_key >= self.middle.last_key().unwrap() {
682            return Ok(());
683        }
684        managed_state.insert(row_ref);
685
686        // insert input row into corresponding cache according to its sort key
687        let mut to_insert = (cache_key, row_ref.into());
688
689        let low_is_full = self.low_is_full();
690        if let Some(low) = &mut self.low {
691            // try insert into low cache
692
693            if !low_is_full {
694                low.insert(to_insert.0, to_insert.1);
695                return Ok(());
696            }
697
698            // low cache is full
699            let low_last = low.last_entry().unwrap();
700            if &to_insert.0 < low_last.key() {
701                // make space for the new entry
702                let low_last = low_last.remove_entry();
703                low.insert(to_insert.0, to_insert.1);
704                to_insert = low_last; // move the last entry to the middle cache
705            }
706        }
707
708        // try insert into middle cache
709
710        if !self.middle_is_full() {
711            self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
712            staging.insert(to_insert.0, to_insert.1);
713            return Ok(());
714        }
715
716        // The row must be in the range of [offset, offset+limit).
717        // the largest row in `cache.middle` needs to be removed.
718        let middle_last = self.middle.pop_last().unwrap();
719        debug_assert!(to_insert.0 < middle_last.0);
720        managed_state.delete(row_deserializer.deserialize(middle_last.1.row.as_ref())?);
721        staging.delete(middle_last.0, middle_last.1);
722
723        self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
724        staging.insert(to_insert.0, to_insert.1);
725
726        // Unlike normal topN, append only topN does not use the high part of the cache.
727
728        Ok(())
729    }
730}
731
732impl AppendOnlyTopNCacheTrait for TopNCache<true> {
733    fn insert<S: StateStore>(
734        &mut self,
735        cache_key: CacheKey,
736        row_ref: RowRef<'_>,
737        staging: &mut TopNStaging,
738        managed_state: &mut ManagedTopNState<S>,
739        row_deserializer: &RowDeserializer,
740    ) -> StreamExecutorResult<()> {
741        assert!(
742            self.low.is_none(),
743            "Offset is not supported yet for WITH TIES, so low cache should be empty"
744        );
745
746        let to_insert = (cache_key, row_ref);
747
748        // try insert into middle cache
749
750        if !self.middle_is_full() {
751            managed_state.insert(to_insert.1);
752            let row: CompactedRow = to_insert.1.into();
753            self.middle.insert(to_insert.0.clone(), row.clone());
754            staging.insert(to_insert.0, row);
755            return Ok(());
756        }
757
758        // middle cache is full
759
760        let to_insert_sort_key = &(to_insert.0).0;
761        let middle_last_sort_key = self.middle.last_key().unwrap().0.clone();
762
763        match to_insert_sort_key.cmp(&middle_last_sort_key) {
764            Ordering::Less => {
765                // the row is in middle
766                let n_ties_of_last = self
767                    .middle
768                    .range((middle_last_sort_key.clone(), vec![])..)
769                    .count();
770                // We evict the last row and its ties only if the number of remaining rows is
771                // still larger than limit, i.e., there are limit-1 other rows.
772                //
773                // e.g., limit = 3, [1,1,1,1]
774                // insert 0 -> [0,1,1,1,1]
775                // insert 0 -> [0,0,1,1,1,1]
776                // insert 0 -> [0,0,0]
777                if self.middle.len() + 1 - n_ties_of_last >= self.limit {
778                    // middle will be full without the last element and its ties after insertion
779                    while let Some(middle_last) = self.middle.last_entry()
780                        && middle_last.key().0 == middle_last_sort_key
781                    {
782                        let middle_last = middle_last.remove_entry();
783                        // we don't need to maintain the high part so just delete it from state table
784                        managed_state
785                            .delete(row_deserializer.deserialize(middle_last.1.row.as_ref())?);
786                        staging.delete(middle_last.0, middle_last.1);
787                    }
788                }
789
790                managed_state.insert(to_insert.1);
791                let row: CompactedRow = to_insert.1.into();
792                self.middle.insert(to_insert.0.clone(), row.clone());
793                staging.insert(to_insert.0, row);
794            }
795            Ordering::Equal => {
796                // the row is in middle and is a tie of the last row
797                managed_state.insert(to_insert.1);
798                let row: CompactedRow = to_insert.1.into();
799                self.middle.insert(to_insert.0.clone(), row.clone());
800                staging.insert(to_insert.0, row);
801            }
802            Ordering::Greater => {
803                // the row is in high, do nothing
804            }
805        }
806
807        Ok(())
808    }
809}
810
811/// Used to build diff between before and after applying an input chunk, for `TopNCache` (of one group).
812/// It should be maintained when an entry is inserted or deleted from the `middle` cache.
813#[derive(Debug, Default)]
814pub struct TopNStaging {
815    to_delete: BTreeMap<CacheKey, CompactedRow>,
816    to_insert: BTreeMap<CacheKey, CompactedRow>,
817    to_update: BTreeMap<CacheKey, (CompactedRow, CompactedRow)>,
818}
819
820impl TopNStaging {
821    pub fn new() -> Self {
822        Self::default()
823    }
824
825    /// Insert a row into the staging changes. This method must be called when a row is
826    /// added to the `middle` cache.
827    fn insert(&mut self, cache_key: CacheKey, row: CompactedRow) {
828        if let Some(old_row) = self.to_delete.remove(&cache_key) {
829            if old_row != row {
830                self.to_update.insert(cache_key, (old_row, row));
831            }
832        } else {
833            self.to_insert.insert(cache_key, row);
834        }
835    }
836
837    /// Delete a row from the staging changes. This method must be called when a row is
838    /// removed from the `middle` cache.
839    fn delete(&mut self, cache_key: CacheKey, row: CompactedRow) {
840        if self.to_insert.remove(&cache_key).is_some() {
841            // do nothing more
842        } else if let Some((old_row, _)) = self.to_update.remove(&cache_key) {
843            self.to_delete.insert(cache_key, old_row);
844        } else {
845            self.to_delete.insert(cache_key, row);
846        }
847    }
848
849    /// Get the count of effective changes in the staging.
850    pub fn len(&self) -> usize {
851        self.to_delete.len() + self.to_insert.len() + self.to_update.len()
852    }
853
854    /// Check if the staging is empty.
855    pub fn is_empty(&self) -> bool {
856        self.to_delete.is_empty() && self.to_insert.is_empty() && self.to_update.is_empty()
857    }
858
859    /// Iterate over the changes in the staging.
860    pub fn into_changes(self) -> impl Iterator<Item = (Op, CompactedRow)> {
861        #[cfg(debug_assertions)]
862        {
863            let keys = self
864                .to_delete
865                .keys()
866                .chain(self.to_insert.keys())
867                .chain(self.to_update.keys())
868                .unique()
869                .count();
870            assert_eq!(
871                keys,
872                self.to_delete.len() + self.to_insert.len() + self.to_update.len(),
873                "should not have duplicate keys with different operations",
874            );
875        }
876
877        // We expect one `CacheKey` to appear at most once in the staging, and, the order of
878        // the outputs of `TopN` doesn't really matter, so we can simply chain the three maps.
879        // Although the output order is not important, we still ensure that `Delete`s are emitted
880        // before `Insert`s, so that we can avoid temporary violation of the `LIMIT` constraint.
881        self.to_update
882            .into_values()
883            .flat_map(|(old_row, new_row)| {
884                [(Op::UpdateDelete, old_row), (Op::UpdateInsert, new_row)]
885            })
886            .chain(self.to_delete.into_values().map(|row| (Op::Delete, row)))
887            .chain(self.to_insert.into_values().map(|row| (Op::Insert, row)))
888    }
889
890    /// Iterate over the changes in the staging, and deserialize the rows.
891    pub fn into_deserialized_changes(
892        self,
893        deserializer: &RowDeserializer,
894    ) -> impl Iterator<Item = StreamExecutorResult<(Op, OwnedRow)>> + '_ {
895        self.into_changes()
896            .map(|(op, row)| Ok((op, deserializer.deserialize(row.row.as_ref())?)))
897    }
898}