1use std::cmp::Ordering;
16use std::fmt::Debug;
17use std::future::Future;
18
19use itertools::Itertools;
20use risingwave_common::array::RowRef;
21use risingwave_common::array::stream_record::Record;
22use risingwave_common::row::{CompactedRow, OwnedRow, Row, RowDeserializer, RowExt};
23use risingwave_common::types::DataType;
24use risingwave_common_estimate_size::EstimateSize;
25use risingwave_common_estimate_size::collections::EstimatedBTreeMap;
26use risingwave_storage::StateStore;
27
28use super::{GroupKey, ManagedTopNState};
29use crate::common::change_buffer::ChangeBuffer;
30use crate::consistency::{consistency_error, enable_strict_consistency};
31use crate::executor::error::StreamExecutorResult;
32
33pub type CacheKey = (Vec<u8>, Vec<u8>);
35pub type Cache = EstimatedBTreeMap<CacheKey, CompactedRow>;
36
37const TOPN_CACHE_HIGH_CAPACITY_FACTOR: usize = 2;
38const TOPN_CACHE_MIN_CAPACITY: usize = 10;
39
40pub struct TopNCache<const WITH_TIES: bool> {
52 pub low: Option<Cache>,
54
55 pub middle: Cache,
60
61 pub high: Cache,
76 pub high_cache_capacity: usize,
77
78 pub offset: usize,
79 pub limit: usize,
81
82 table_row_count: Option<usize>,
86
87 data_types: Vec<DataType>,
91}
92
93impl<const WITH_TIES: bool> EstimateSize for TopNCache<WITH_TIES> {
94 fn estimated_heap_size(&self) -> usize {
95 self.low.estimated_heap_size()
96 + self.middle.estimated_heap_size()
97 + self.high.estimated_heap_size()
98 }
99}
100
101impl<const WITH_TIES: bool> Debug for TopNCache<WITH_TIES> {
102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103 write!(
104 f,
105 "TopNCache {{\n offset: {}, limit: {}, high_cache_capacity: {},\n",
106 self.offset, self.limit, self.high_cache_capacity
107 )?;
108
109 fn format_cache(
110 f: &mut std::fmt::Formatter<'_>,
111 cache: &Cache,
112 data_types: &[DataType],
113 ) -> std::fmt::Result {
114 if cache.is_empty() {
115 return write!(f, " <empty>");
116 }
117 write!(
118 f,
119 " {}",
120 cache
121 .iter()
122 .format_with("\n ", |item, f| f(&format_args!(
123 "{:?}, {}",
124 item.0,
125 item.1.deserialize(data_types).unwrap().display(),
126 )))
127 )
128 }
129
130 writeln!(f, " low:")?;
131 if let Some(low) = &self.low {
132 format_cache(f, low, &self.data_types)?;
133 } else {
134 writeln!(f, " <none>")?;
135 }
136 writeln!(f, "\n middle:")?;
137 format_cache(f, &self.middle, &self.data_types)?;
138 writeln!(f, "\n high:")?;
139 format_cache(f, &self.high, &self.data_types)?;
140
141 write!(f, "\n}}")?;
142 Ok(())
143 }
144}
145
146pub trait TopNCacheTrait {
150 fn insert(&mut self, cache_key: CacheKey, row: impl Row, staging: &mut TopNStaging);
155
156 fn delete<S: StateStore>(
165 &mut self,
166 group_key: Option<impl GroupKey>,
167 managed_state: &mut ManagedTopNState<S>,
168 cache_key: CacheKey,
169 row: impl Row,
170 staging: &mut TopNStaging,
171 ) -> impl Future<Output = StreamExecutorResult<()>> + Send;
172}
173
174impl<const WITH_TIES: bool> TopNCache<WITH_TIES> {
175 pub fn new(offset: usize, limit: usize, data_types: Vec<DataType>) -> Self {
178 Self::with_min_capacity(offset, limit, data_types, TOPN_CACHE_MIN_CAPACITY)
179 }
180
181 pub fn with_min_capacity(
184 offset: usize,
185 limit: usize,
186 data_types: Vec<DataType>,
187 min_capacity: usize,
188 ) -> Self {
189 assert!(limit > 0);
190 if WITH_TIES {
191 assert!(offset == 0, "OFFSET is not supported with WITH TIES");
194 }
195 let high_cache_capacity = offset
196 .checked_add(limit)
197 .and_then(|v| v.checked_mul(TOPN_CACHE_HIGH_CAPACITY_FACTOR))
198 .unwrap_or(usize::MAX)
199 .max(min_capacity);
200 Self {
201 low: if offset > 0 { Some(Cache::new()) } else { None },
202 middle: Cache::new(),
203 high: Cache::new(),
204 high_cache_capacity,
205 offset,
206 limit,
207 table_row_count: None,
208 data_types,
209 }
210 }
211
212 #[allow(dead_code)]
214 pub fn clear(&mut self) {
215 self.low.as_mut().map(Cache::clear);
216 self.middle.clear();
217 self.high.clear();
218 }
219
220 pub fn len(&self) -> usize {
222 self.low.as_ref().map(Cache::len).unwrap_or(0) + self.middle.len() + self.high.len()
223 }
224
225 pub(super) fn update_table_row_count(&mut self, table_row_count: usize) {
226 self.table_row_count = Some(table_row_count)
227 }
228
229 pub fn low_is_full(&self) -> bool {
230 if let Some(low) = &self.low {
231 assert!(low.len() <= self.offset);
232 let full = low.len() == self.offset;
233 if !full {
234 assert!(self.middle.is_empty());
235 assert!(self.high.is_empty());
236 }
237 full
238 } else {
239 true
240 }
241 }
242
243 pub fn middle_is_full(&self) -> bool {
244 if !WITH_TIES {
246 assert!(
247 self.middle.len() <= self.limit,
248 "the middle cache exceeds the capacity\n{self:?}"
249 );
250 }
251 let full = self.middle.len() >= self.limit;
252 if full {
253 assert!(self.low_is_full());
254 } else {
255 assert!(
256 self.high.is_empty(),
257 "the high cache is not empty when middle cache is not full:\n{self:?}"
258 );
259 }
260 full
261 }
262
263 pub fn high_is_full(&self) -> bool {
264 if !WITH_TIES {
266 assert!(self.high.len() <= self.high_cache_capacity);
267 }
268 self.high.len() >= self.high_cache_capacity
269 }
270
271 fn high_is_synced(&self) -> bool {
272 if !self.high.is_empty() {
273 true
274 } else {
275 self.table_row_count
277 .map(|n| n == self.len())
278 .unwrap_or(false)
279 }
280 }
281
282 fn last_cache_key_before_high(&self) -> Option<&CacheKey> {
283 let middle_last_key = self.middle.last_key_value().map(|(k, _)| k);
284 middle_last_key.or_else(|| {
285 self.low
286 .as_ref()
287 .and_then(Cache::last_key_value)
288 .map(|(k, _)| k)
289 })
290 }
291}
292
293impl TopNCacheTrait for TopNCache<false> {
294 fn insert(&mut self, cache_key: CacheKey, row: impl Row, staging: &mut TopNStaging) {
295 if let Some(row_count) = self.table_row_count.as_mut() {
296 *row_count += 1;
297 }
298
299 let mut to_insert = (cache_key, (&row).into());
300 let mut is_last_of_lower_cache = false; let low_is_full = self.low_is_full();
303 if let Some(low) = &mut self.low {
304 if !low_is_full {
307 low.insert(to_insert.0, to_insert.1);
308 return;
309 }
310
311 let low_last = low.last_entry().unwrap();
313 if &to_insert.0 < low_last.key() {
314 let low_last = low_last.remove_entry();
316 low.insert(to_insert.0, to_insert.1);
317 to_insert = low_last; is_last_of_lower_cache = true;
319 }
320 }
321
322 if !self.middle_is_full() {
325 self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
326 staging.insert(to_insert.0, to_insert.1);
327 return;
328 }
329
330 let middle_last = self.middle.last_entry().unwrap();
332 if is_last_of_lower_cache || &to_insert.0 < middle_last.key() {
333 let middle_last = middle_last.remove_entry();
335 self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
336
337 staging.delete(middle_last.0.clone(), middle_last.1.clone());
338 staging.insert(to_insert.0, to_insert.1);
339
340 to_insert = middle_last; is_last_of_lower_cache = true;
342 }
343
344 if is_last_of_lower_cache || self.high_is_synced() {
350 if self.high.is_empty() {
354 self.high.insert(to_insert.0, to_insert.1);
356 return;
357 }
358
359 let high_is_full = self.high_is_full();
360 let high_last = self.high.last_entry().unwrap();
361
362 if is_last_of_lower_cache || &to_insert.0 < high_last.key() {
363 if high_is_full {
365 high_last.remove_entry();
367 }
368 self.high.insert(to_insert.0, to_insert.1);
369 }
370 }
371 }
372
373 async fn delete<S: StateStore>(
374 &mut self,
375 group_key: Option<impl GroupKey>,
376 managed_state: &mut ManagedTopNState<S>,
377 cache_key: CacheKey,
378 row: impl Row,
379 staging: &mut TopNStaging,
380 ) -> StreamExecutorResult<()> {
381 if !enable_strict_consistency() && self.table_row_count == Some(0) {
382 consistency_error!("table row count is 0, but we receive a DELETE operation");
385 self.table_row_count = None;
386 }
387 if let Some(row_count) = self.table_row_count.as_mut() {
388 *row_count -= 1;
389 }
390
391 if self.middle_is_full() && &cache_key > self.middle.last_key_value().unwrap().0 {
392 self.high.remove(&cache_key);
394 } else if self.low_is_full()
395 && self
396 .low
397 .as_ref()
398 .map(|low| &cache_key > low.last_key_value().unwrap().0)
399 .unwrap_or(
400 true, )
402 {
403 let removed = self.middle.remove(&cache_key);
405 staging.delete(cache_key.clone(), (&row).into());
406
407 if removed.is_none() {
408 consistency_error!(
411 ?group_key,
412 ?cache_key,
413 "cache key not found in middle cache"
414 );
415 return Ok(());
416 }
417
418 if !self.high_is_synced() {
420 self.high.clear();
421 managed_state
422 .fill_high_cache(
423 group_key,
424 self,
425 self.last_cache_key_before_high().cloned(),
426 self.high_cache_capacity,
427 )
428 .await?;
429 }
430
431 if !self.high.is_empty() {
433 let high_first = self.high.pop_first().unwrap();
434 self.middle
435 .insert(high_first.0.clone(), high_first.1.clone());
436 staging.insert(high_first.0, high_first.1);
437 }
438
439 assert!(self.high.is_empty() || self.middle.len() == self.limit);
440 } else {
441 let low = self.low.as_mut().unwrap();
443 let removed = low.remove(&cache_key);
444
445 if removed.is_none() {
446 consistency_error!(?group_key, ?cache_key, "cache key not found in low cache");
449 return Ok(());
450 }
451
452 if !self.middle.is_empty() {
454 let middle_first = self.middle.pop_first().unwrap();
455 staging.delete(middle_first.0.clone(), middle_first.1.clone());
456 low.insert(middle_first.0, middle_first.1);
457
458 if !self.high_is_synced() {
460 self.high.clear();
461 managed_state
462 .fill_high_cache(
463 group_key,
464 self,
465 self.last_cache_key_before_high().cloned(),
466 self.high_cache_capacity,
467 )
468 .await?;
469 }
470
471 if !self.high.is_empty() {
473 let high_first = self.high.pop_first().unwrap();
474 self.middle
475 .insert(high_first.0.clone(), high_first.1.clone());
476 staging.insert(high_first.0, high_first.1);
477 }
478 }
479 }
480
481 Ok(())
482 }
483}
484
485impl TopNCacheTrait for TopNCache<true> {
486 fn insert(&mut self, cache_key: CacheKey, row: impl Row, staging: &mut TopNStaging) {
487 if let Some(row_count) = self.table_row_count.as_mut() {
488 *row_count += 1;
489 }
490
491 assert!(
492 self.low.is_none(),
493 "Offset is not supported yet for WITH TIES, so low cache should be None"
494 );
495
496 let to_insert: (CacheKey, CompactedRow) = (cache_key, (&row).into());
497
498 if !self.middle_is_full() {
501 self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
502 staging.insert(to_insert.0.clone(), to_insert.1);
503 return;
504 }
505
506 let to_insert_sort_key = &(to_insert.0).0;
509 let middle_last_sort_key = self.middle.last_key().unwrap().0.clone();
510
511 match to_insert_sort_key.cmp(&middle_last_sort_key) {
512 Ordering::Less => {
513 let n_ties_of_last = self
515 .middle
516 .range((middle_last_sort_key.clone(), vec![])..)
517 .count();
518 if self.middle.len() + 1 - n_ties_of_last >= self.limit {
526 while let Some(middle_last) = self.middle.last_entry()
529 && middle_last.key().0 == middle_last_sort_key
530 {
531 let middle_last = middle_last.remove_entry();
532 staging.delete(middle_last.0.clone(), middle_last.1.clone());
533 self.high.insert(middle_last.0, middle_last.1);
535 }
536 }
537 if self.high.len() > self.high_cache_capacity {
538 let high_last = self.high.pop_last().unwrap();
540 let high_last_sort_key = (high_last.0).0;
541 self.high.retain(|k, _| k.0 != high_last_sort_key);
544 }
545
546 self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
547 staging.insert(to_insert.0, to_insert.1);
548 }
549 Ordering::Equal => {
550 self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
552 staging.insert(to_insert.0, to_insert.1);
553 }
554 Ordering::Greater => {
555 if self.high_is_synced() {
558 if self.high.is_empty() {
561 self.high.insert(to_insert.0, to_insert.1);
563 return;
564 }
565
566 if to_insert_sort_key <= &self.high.last_key().unwrap().0 {
567 self.high.insert(to_insert.0, to_insert.1);
571 }
572
573 if self.high.len() > self.high_cache_capacity {
574 let high_last = self.high.pop_last().unwrap();
576 let high_last_sort_key = (high_last.0).0;
577 self.high.retain(|k, _| k.0 != high_last_sort_key);
580 }
581 }
582 }
583 }
584 }
585
586 async fn delete<S: StateStore>(
587 &mut self,
588 group_key: Option<impl GroupKey>,
589 managed_state: &mut ManagedTopNState<S>,
590 cache_key: CacheKey,
591 row: impl Row,
592 staging: &mut TopNStaging,
593 ) -> StreamExecutorResult<()> {
594 if !enable_strict_consistency() && self.table_row_count == Some(0) {
595 self.table_row_count = None;
598 }
599 if let Some(row_count) = self.table_row_count.as_mut() {
600 *row_count -= 1;
601 }
602
603 assert!(
604 self.low.is_none(),
605 "Offset is not supported yet for WITH TIES, so low cache should be None"
606 );
607
608 if self.middle.is_empty() {
609 consistency_error!(
610 ?group_key,
611 ?cache_key,
612 "middle cache is empty, but we receive a DELETE operation"
613 );
614 staging.delete(cache_key, (&row).into());
615 return Ok(());
616 }
617
618 let middle_last_sort_key = self.middle.last_key().unwrap().0.clone();
619
620 let to_delete_sort_key = cache_key.0.clone();
621 if to_delete_sort_key > middle_last_sort_key {
622 self.high.remove(&cache_key);
624 } else {
625 self.middle.remove(&cache_key);
627 staging.delete(cache_key.clone(), (&row).into());
628 if self.middle.len() >= self.limit {
629 return Ok(());
631 }
632
633 if !self.high_is_synced() {
635 managed_state
636 .fill_high_cache(
637 group_key,
638 self,
639 self.last_cache_key_before_high().cloned(),
640 self.high_cache_capacity,
641 )
642 .await?;
643 }
644
645 if !self.high.is_empty() {
647 let high_first = self.high.pop_first().unwrap();
648 let high_first_sort_key = (high_first.0).0.clone();
649 assert!(high_first_sort_key > middle_last_sort_key);
650
651 self.middle
652 .insert(high_first.0.clone(), high_first.1.clone());
653 staging.insert(high_first.0, high_first.1);
654
655 for (cache_key, row) in self.high.extract_if(|k, _| k.0 == high_first_sort_key) {
656 self.middle.insert(cache_key.clone(), row.clone());
657 staging.insert(cache_key, row);
658 }
659 }
660 }
661
662 Ok(())
663 }
664}
665
666pub trait AppendOnlyTopNCacheTrait {
668 fn insert<S: StateStore>(
676 &mut self,
677 cache_key: CacheKey,
678 row_ref: RowRef<'_>,
679 staging: &mut TopNStaging,
680 managed_state: &mut ManagedTopNState<S>,
681 row_deserializer: &RowDeserializer,
682 ) -> StreamExecutorResult<()>;
683}
684
685impl AppendOnlyTopNCacheTrait for TopNCache<false> {
686 fn insert<S: StateStore>(
687 &mut self,
688 cache_key: CacheKey,
689 row_ref: RowRef<'_>,
690 staging: &mut TopNStaging,
691 managed_state: &mut ManagedTopNState<S>,
692 row_deserializer: &RowDeserializer,
693 ) -> StreamExecutorResult<()> {
694 if self.middle_is_full() && &cache_key >= self.middle.last_key().unwrap() {
695 return Ok(());
696 }
697 managed_state.insert(row_ref);
698
699 let mut to_insert = (cache_key, row_ref.into());
701
702 let low_is_full = self.low_is_full();
703 if let Some(low) = &mut self.low {
704 if !low_is_full {
707 low.insert(to_insert.0, to_insert.1);
708 return Ok(());
709 }
710
711 let low_last = low.last_entry().unwrap();
713 if &to_insert.0 < low_last.key() {
714 let low_last = low_last.remove_entry();
716 low.insert(to_insert.0, to_insert.1);
717 to_insert = low_last; }
719 }
720
721 if !self.middle_is_full() {
724 self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
725 staging.insert(to_insert.0, to_insert.1);
726 return Ok(());
727 }
728
729 let middle_last = self.middle.pop_last().unwrap();
732 debug_assert!(to_insert.0 < middle_last.0);
733 managed_state.delete(row_deserializer.deserialize(middle_last.1.row.as_ref())?);
734 staging.delete(middle_last.0, middle_last.1);
735
736 self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
737 staging.insert(to_insert.0, to_insert.1);
738
739 Ok(())
742 }
743}
744
745impl AppendOnlyTopNCacheTrait for TopNCache<true> {
746 fn insert<S: StateStore>(
747 &mut self,
748 cache_key: CacheKey,
749 row_ref: RowRef<'_>,
750 staging: &mut TopNStaging,
751 managed_state: &mut ManagedTopNState<S>,
752 row_deserializer: &RowDeserializer,
753 ) -> StreamExecutorResult<()> {
754 assert!(
755 self.low.is_none(),
756 "Offset is not supported yet for WITH TIES, so low cache should be empty"
757 );
758
759 let to_insert = (cache_key, row_ref);
760
761 if !self.middle_is_full() {
764 managed_state.insert(to_insert.1);
765 let row: CompactedRow = to_insert.1.into();
766 self.middle.insert(to_insert.0.clone(), row.clone());
767 staging.insert(to_insert.0, row);
768 return Ok(());
769 }
770
771 let to_insert_sort_key = &(to_insert.0).0;
774 let middle_last_sort_key = self.middle.last_key().unwrap().0.clone();
775
776 match to_insert_sort_key.cmp(&middle_last_sort_key) {
777 Ordering::Less => {
778 let n_ties_of_last = self
780 .middle
781 .range((middle_last_sort_key.clone(), vec![])..)
782 .count();
783 if self.middle.len() + 1 - n_ties_of_last >= self.limit {
791 while let Some(middle_last) = self.middle.last_entry()
793 && middle_last.key().0 == middle_last_sort_key
794 {
795 let middle_last = middle_last.remove_entry();
796 managed_state
798 .delete(row_deserializer.deserialize(middle_last.1.row.as_ref())?);
799 staging.delete(middle_last.0, middle_last.1);
800 }
801 }
802
803 managed_state.insert(to_insert.1);
804 let row: CompactedRow = to_insert.1.into();
805 self.middle.insert(to_insert.0.clone(), row.clone());
806 staging.insert(to_insert.0, row);
807 }
808 Ordering::Equal => {
809 managed_state.insert(to_insert.1);
811 let row: CompactedRow = to_insert.1.into();
812 self.middle.insert(to_insert.0.clone(), row.clone());
813 staging.insert(to_insert.0, row);
814 }
815 Ordering::Greater => {
816 }
818 }
819
820 Ok(())
821 }
822}
823
824#[derive(Debug, Default)]
827pub struct TopNStaging {
828 inner: ChangeBuffer<CacheKey, CompactedRow>,
829}
830
831impl TopNStaging {
832 pub fn new() -> Self {
833 Self::default()
834 }
835
836 fn insert(&mut self, cache_key: CacheKey, row: CompactedRow) {
839 self.inner.insert(cache_key, row);
840 }
841
842 fn delete(&mut self, cache_key: CacheKey, row: CompactedRow) {
845 self.inner.delete(cache_key, row);
846 }
847
848 pub fn len(&self) -> usize {
850 self.inner.len()
851 }
852
853 pub fn is_empty(&self) -> bool {
855 self.inner.is_empty()
856 }
857
858 pub fn into_deserialized_changes(
860 self,
861 deserializer: &RowDeserializer,
862 ) -> impl Iterator<Item = StreamExecutorResult<Record<OwnedRow>>> + '_ {
863 self.inner.into_records().map(|record| {
864 record.try_map(|row| {
865 deserializer
866 .deserialize(row.row.as_ref())
867 .map_err(Into::into)
868 })
869 })
870 }
871}
872
873#[cfg(test)]
874mod tests {
875 use risingwave_common::types::DataType;
876
877 use super::*;
878
879 #[test]
880 fn test_topn_cache_new_uses_default_min_capacity() {
881 let cache = TopNCache::<false>::new(0, 5, vec![DataType::Int32]);
882 assert_eq!(cache.high_cache_capacity, TOPN_CACHE_MIN_CAPACITY);
883 }
884
885 #[test]
886 fn test_topn_cache_with_custom_min_capacity() {
887 let custom_min_capacity = 25;
888 let cache =
889 TopNCache::<false>::with_min_capacity(0, 5, vec![DataType::Int32], custom_min_capacity);
890 assert_eq!(cache.high_cache_capacity, custom_min_capacity);
891 }
892
893 #[test]
894 fn test_topn_cache_high_capacity_factor_respected() {
895 let custom_min_capacity = 1;
896 let offset = 2;
897 let limit = 5;
898 let expected_capacity = (offset + limit) * TOPN_CACHE_HIGH_CAPACITY_FACTOR;
899
900 let cache = TopNCache::<false>::with_min_capacity(
901 offset,
902 limit,
903 vec![DataType::Int32],
904 custom_min_capacity,
905 );
906 assert_eq!(cache.high_cache_capacity, expected_capacity);
907 }
908
909 #[test]
910 fn test_topn_cache_min_capacity_takes_precedence_when_larger() {
911 let large_min_capacity = 100;
912 let offset = 0;
913 let limit = 5;
914 let expected_from_formula = (offset + limit) * TOPN_CACHE_HIGH_CAPACITY_FACTOR; let cache = TopNCache::<false>::with_min_capacity(
917 offset,
918 limit,
919 vec![DataType::Int32],
920 large_min_capacity,
921 );
922 assert_eq!(cache.high_cache_capacity, large_min_capacity);
923 assert!(cache.high_cache_capacity > expected_from_formula);
924 }
925}