1use std::cmp::Ordering;
16use std::collections::BTreeMap;
17use std::fmt::Debug;
18use std::future::Future;
19
20use itertools::Itertools;
21use risingwave_common::array::{Op, RowRef};
22use risingwave_common::row::{CompactedRow, OwnedRow, Row, RowDeserializer, RowExt};
23use risingwave_common::types::DataType;
24use risingwave_common_estimate_size::EstimateSize;
25use risingwave_common_estimate_size::collections::EstimatedBTreeMap;
26use risingwave_storage::StateStore;
27
28use super::{GroupKey, ManagedTopNState};
29use crate::consistency::{consistency_error, enable_strict_consistency};
30use crate::executor::error::StreamExecutorResult;
31
32pub type CacheKey = (Vec<u8>, Vec<u8>);
34pub type Cache = EstimatedBTreeMap<CacheKey, CompactedRow>;
35
36const TOPN_CACHE_HIGH_CAPACITY_FACTOR: usize = 2;
37const TOPN_CACHE_MIN_CAPACITY: usize = 10;
38
39pub struct TopNCache<const WITH_TIES: bool> {
51 pub low: Option<Cache>,
53
54 pub middle: Cache,
59
60 pub high: Cache,
75 pub high_cache_capacity: usize,
76
77 pub offset: usize,
78 pub limit: usize,
80
81 table_row_count: Option<usize>,
85
86 data_types: Vec<DataType>,
90}
91
92impl<const WITH_TIES: bool> EstimateSize for TopNCache<WITH_TIES> {
93 fn estimated_heap_size(&self) -> usize {
94 self.low.estimated_heap_size()
95 + self.middle.estimated_heap_size()
96 + self.high.estimated_heap_size()
97 }
98}
99
100impl<const WITH_TIES: bool> Debug for TopNCache<WITH_TIES> {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 write!(
103 f,
104 "TopNCache {{\n offset: {}, limit: {}, high_cache_capacity: {},\n",
105 self.offset, self.limit, self.high_cache_capacity
106 )?;
107
108 fn format_cache(
109 f: &mut std::fmt::Formatter<'_>,
110 cache: &Cache,
111 data_types: &[DataType],
112 ) -> std::fmt::Result {
113 if cache.is_empty() {
114 return write!(f, " <empty>");
115 }
116 write!(
117 f,
118 " {}",
119 cache
120 .iter()
121 .format_with("\n ", |item, f| f(&format_args!(
122 "{:?}, {}",
123 item.0,
124 item.1.deserialize(data_types).unwrap().display(),
125 )))
126 )
127 }
128
129 writeln!(f, " low:")?;
130 if let Some(low) = &self.low {
131 format_cache(f, low, &self.data_types)?;
132 } else {
133 writeln!(f, " <none>")?;
134 }
135 writeln!(f, "\n middle:")?;
136 format_cache(f, &self.middle, &self.data_types)?;
137 writeln!(f, "\n high:")?;
138 format_cache(f, &self.high, &self.data_types)?;
139
140 write!(f, "\n}}")?;
141 Ok(())
142 }
143}
144
145pub trait TopNCacheTrait {
149 fn insert(&mut self, cache_key: CacheKey, row: impl Row + Send, staging: &mut TopNStaging);
154
155 fn delete<S: StateStore>(
164 &mut self,
165 group_key: Option<impl GroupKey>,
166 managed_state: &mut ManagedTopNState<S>,
167 cache_key: CacheKey,
168 row: impl Row + Send,
169 staging: &mut TopNStaging,
170 ) -> impl Future<Output = StreamExecutorResult<()>> + Send;
171}
172
173impl<const WITH_TIES: bool> TopNCache<WITH_TIES> {
174 pub fn new(offset: usize, limit: usize, data_types: Vec<DataType>) -> Self {
176 assert!(limit > 0);
177 if WITH_TIES {
178 assert!(offset == 0, "OFFSET is not supported with WITH TIES");
181 }
182 let high_cache_capacity = offset
183 .checked_add(limit)
184 .and_then(|v| v.checked_mul(TOPN_CACHE_HIGH_CAPACITY_FACTOR))
185 .unwrap_or(usize::MAX)
186 .max(TOPN_CACHE_MIN_CAPACITY);
187 Self {
188 low: if offset > 0 { Some(Cache::new()) } else { None },
189 middle: Cache::new(),
190 high: Cache::new(),
191 high_cache_capacity,
192 offset,
193 limit,
194 table_row_count: None,
195 data_types,
196 }
197 }
198
199 #[allow(dead_code)]
201 pub fn clear(&mut self) {
202 self.low.as_mut().map(Cache::clear);
203 self.middle.clear();
204 self.high.clear();
205 }
206
207 pub fn len(&self) -> usize {
209 self.low.as_ref().map(Cache::len).unwrap_or(0) + self.middle.len() + self.high.len()
210 }
211
212 pub(super) fn update_table_row_count(&mut self, table_row_count: usize) {
213 self.table_row_count = Some(table_row_count)
214 }
215
216 pub fn low_is_full(&self) -> bool {
217 if let Some(low) = &self.low {
218 assert!(low.len() <= self.offset);
219 let full = low.len() == self.offset;
220 if !full {
221 assert!(self.middle.is_empty());
222 assert!(self.high.is_empty());
223 }
224 full
225 } else {
226 true
227 }
228 }
229
230 pub fn middle_is_full(&self) -> bool {
231 if !WITH_TIES {
233 assert!(
234 self.middle.len() <= self.limit,
235 "the middle cache exceeds the capacity\n{self:?}"
236 );
237 }
238 let full = self.middle.len() >= self.limit;
239 if full {
240 assert!(self.low_is_full());
241 } else {
242 assert!(
243 self.high.is_empty(),
244 "the high cache is not empty when middle cache is not full:\n{self:?}"
245 );
246 }
247 full
248 }
249
250 pub fn high_is_full(&self) -> bool {
251 if !WITH_TIES {
253 assert!(self.high.len() <= self.high_cache_capacity);
254 }
255 self.high.len() >= self.high_cache_capacity
256 }
257
258 fn high_is_synced(&self) -> bool {
259 if !self.high.is_empty() {
260 true
261 } else {
262 self.table_row_count
264 .map(|n| n == self.len())
265 .unwrap_or(false)
266 }
267 }
268
269 fn last_cache_key_before_high(&self) -> Option<&CacheKey> {
270 let middle_last_key = self.middle.last_key_value().map(|(k, _)| k);
271 middle_last_key.or_else(|| {
272 self.low
273 .as_ref()
274 .and_then(Cache::last_key_value)
275 .map(|(k, _)| k)
276 })
277 }
278}
279
280impl TopNCacheTrait for TopNCache<false> {
281 fn insert(&mut self, cache_key: CacheKey, row: impl Row + Send, staging: &mut TopNStaging) {
282 if let Some(row_count) = self.table_row_count.as_mut() {
283 *row_count += 1;
284 }
285
286 let mut to_insert = (cache_key, (&row).into());
287 let mut is_last_of_lower_cache = false; let low_is_full = self.low_is_full();
290 if let Some(low) = &mut self.low {
291 if !low_is_full {
294 low.insert(to_insert.0, to_insert.1);
295 return;
296 }
297
298 let low_last = low.last_entry().unwrap();
300 if &to_insert.0 < low_last.key() {
301 let low_last = low_last.remove_entry();
303 low.insert(to_insert.0, to_insert.1);
304 to_insert = low_last; is_last_of_lower_cache = true;
306 }
307 }
308
309 if !self.middle_is_full() {
312 self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
313 staging.insert(to_insert.0, to_insert.1);
314 return;
315 }
316
317 let middle_last = self.middle.last_entry().unwrap();
319 if is_last_of_lower_cache || &to_insert.0 < middle_last.key() {
320 let middle_last = middle_last.remove_entry();
322 self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
323
324 staging.delete(middle_last.0.clone(), middle_last.1.clone());
325 staging.insert(to_insert.0, to_insert.1);
326
327 to_insert = middle_last; is_last_of_lower_cache = true;
329 }
330
331 if is_last_of_lower_cache || self.high_is_synced() {
337 if self.high.is_empty() {
341 self.high.insert(to_insert.0, to_insert.1);
343 return;
344 }
345
346 let high_is_full = self.high_is_full();
347 let high_last = self.high.last_entry().unwrap();
348
349 if is_last_of_lower_cache || &to_insert.0 < high_last.key() {
350 if high_is_full {
352 high_last.remove_entry();
354 }
355 self.high.insert(to_insert.0, to_insert.1);
356 }
357 }
358 }
359
360 async fn delete<S: StateStore>(
361 &mut self,
362 group_key: Option<impl GroupKey>,
363 managed_state: &mut ManagedTopNState<S>,
364 cache_key: CacheKey,
365 row: impl Row + Send,
366 staging: &mut TopNStaging,
367 ) -> StreamExecutorResult<()> {
368 if !enable_strict_consistency() && self.table_row_count == Some(0) {
369 consistency_error!("table row count is 0, but we receive a DELETE operation");
372 self.table_row_count = None;
373 }
374 if let Some(row_count) = self.table_row_count.as_mut() {
375 *row_count -= 1;
376 }
377
378 if self.middle_is_full() && &cache_key > self.middle.last_key_value().unwrap().0 {
379 self.high.remove(&cache_key);
381 } else if self.low_is_full()
382 && self
383 .low
384 .as_ref()
385 .map(|low| &cache_key > low.last_key_value().unwrap().0)
386 .unwrap_or(
387 true, )
389 {
390 let removed = self.middle.remove(&cache_key);
392 staging.delete(cache_key.clone(), (&row).into());
393
394 if removed.is_none() {
395 consistency_error!(
398 ?group_key,
399 ?cache_key,
400 "cache key not found in middle cache"
401 );
402 return Ok(());
403 }
404
405 if !self.high_is_synced() {
407 self.high.clear();
408 managed_state
409 .fill_high_cache(
410 group_key,
411 self,
412 self.last_cache_key_before_high().cloned(),
413 self.high_cache_capacity,
414 )
415 .await?;
416 }
417
418 if !self.high.is_empty() {
420 let high_first = self.high.pop_first().unwrap();
421 self.middle
422 .insert(high_first.0.clone(), high_first.1.clone());
423 staging.insert(high_first.0, high_first.1);
424 }
425
426 assert!(self.high.is_empty() || self.middle.len() == self.limit);
427 } else {
428 let low = self.low.as_mut().unwrap();
430 let removed = low.remove(&cache_key);
431
432 if removed.is_none() {
433 consistency_error!(?group_key, ?cache_key, "cache key not found in low cache");
436 return Ok(());
437 }
438
439 if !self.middle.is_empty() {
441 let middle_first = self.middle.pop_first().unwrap();
442 staging.delete(middle_first.0.clone(), middle_first.1.clone());
443 low.insert(middle_first.0, middle_first.1);
444
445 if !self.high_is_synced() {
447 self.high.clear();
448 managed_state
449 .fill_high_cache(
450 group_key,
451 self,
452 self.last_cache_key_before_high().cloned(),
453 self.high_cache_capacity,
454 )
455 .await?;
456 }
457
458 if !self.high.is_empty() {
460 let high_first = self.high.pop_first().unwrap();
461 self.middle
462 .insert(high_first.0.clone(), high_first.1.clone());
463 staging.insert(high_first.0, high_first.1);
464 }
465 }
466 }
467
468 Ok(())
469 }
470}
471
472impl TopNCacheTrait for TopNCache<true> {
473 fn insert(&mut self, cache_key: CacheKey, row: impl Row + Send, staging: &mut TopNStaging) {
474 if let Some(row_count) = self.table_row_count.as_mut() {
475 *row_count += 1;
476 }
477
478 assert!(
479 self.low.is_none(),
480 "Offset is not supported yet for WITH TIES, so low cache should be None"
481 );
482
483 let to_insert: (CacheKey, CompactedRow) = (cache_key, (&row).into());
484
485 if !self.middle_is_full() {
488 self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
489 staging.insert(to_insert.0.clone(), to_insert.1);
490 return;
491 }
492
493 let to_insert_sort_key = &(to_insert.0).0;
496 let middle_last_sort_key = self.middle.last_key().unwrap().0.clone();
497
498 match to_insert_sort_key.cmp(&middle_last_sort_key) {
499 Ordering::Less => {
500 let n_ties_of_last = self
502 .middle
503 .range((middle_last_sort_key.clone(), vec![])..)
504 .count();
505 if self.middle.len() + 1 - n_ties_of_last >= self.limit {
513 while let Some(middle_last) = self.middle.last_entry()
516 && middle_last.key().0 == middle_last_sort_key
517 {
518 let middle_last = middle_last.remove_entry();
519 staging.delete(middle_last.0.clone(), middle_last.1.clone());
520 self.high.insert(middle_last.0, middle_last.1);
522 }
523 }
524 if self.high.len() > self.high_cache_capacity {
525 let high_last = self.high.pop_last().unwrap();
527 let high_last_sort_key = (high_last.0).0;
528 self.high.retain(|k, _| k.0 != high_last_sort_key);
531 }
532
533 self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
534 staging.insert(to_insert.0, to_insert.1);
535 }
536 Ordering::Equal => {
537 self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
539 staging.insert(to_insert.0, to_insert.1);
540 }
541 Ordering::Greater => {
542 if self.high_is_synced() {
545 if self.high.is_empty() {
548 self.high.insert(to_insert.0, to_insert.1);
550 return;
551 }
552
553 if to_insert_sort_key <= &self.high.last_key().unwrap().0 {
554 self.high.insert(to_insert.0, to_insert.1);
558 }
559
560 if self.high.len() > self.high_cache_capacity {
561 let high_last = self.high.pop_last().unwrap();
563 let high_last_sort_key = (high_last.0).0;
564 self.high.retain(|k, _| k.0 != high_last_sort_key);
567 }
568 }
569 }
570 }
571 }
572
573 async fn delete<S: StateStore>(
574 &mut self,
575 group_key: Option<impl GroupKey>,
576 managed_state: &mut ManagedTopNState<S>,
577 cache_key: CacheKey,
578 row: impl Row + Send,
579 staging: &mut TopNStaging,
580 ) -> StreamExecutorResult<()> {
581 if !enable_strict_consistency() && self.table_row_count == Some(0) {
582 self.table_row_count = None;
585 }
586 if let Some(row_count) = self.table_row_count.as_mut() {
587 *row_count -= 1;
588 }
589
590 assert!(
591 self.low.is_none(),
592 "Offset is not supported yet for WITH TIES, so low cache should be None"
593 );
594
595 if self.middle.is_empty() {
596 consistency_error!(
597 ?group_key,
598 ?cache_key,
599 "middle cache is empty, but we receive a DELETE operation"
600 );
601 staging.delete(cache_key, (&row).into());
602 return Ok(());
603 }
604
605 let middle_last_sort_key = self.middle.last_key().unwrap().0.clone();
606
607 let to_delete_sort_key = cache_key.0.clone();
608 if to_delete_sort_key > middle_last_sort_key {
609 self.high.remove(&cache_key);
611 } else {
612 self.middle.remove(&cache_key);
614 staging.delete(cache_key.clone(), (&row).into());
615 if self.middle.len() >= self.limit {
616 return Ok(());
618 }
619
620 if !self.high_is_synced() {
622 managed_state
623 .fill_high_cache(
624 group_key,
625 self,
626 self.last_cache_key_before_high().cloned(),
627 self.high_cache_capacity,
628 )
629 .await?;
630 }
631
632 if !self.high.is_empty() {
634 let high_first = self.high.pop_first().unwrap();
635 let high_first_sort_key = (high_first.0).0.clone();
636 assert!(high_first_sort_key > middle_last_sort_key);
637
638 self.middle
639 .insert(high_first.0.clone(), high_first.1.clone());
640 staging.insert(high_first.0, high_first.1);
641
642 for (cache_key, row) in self.high.extract_if(|k, _| k.0 == high_first_sort_key) {
643 self.middle.insert(cache_key.clone(), row.clone());
644 staging.insert(cache_key, row);
645 }
646 }
647 }
648
649 Ok(())
650 }
651}
652
653pub trait AppendOnlyTopNCacheTrait {
655 fn insert<S: StateStore>(
663 &mut self,
664 cache_key: CacheKey,
665 row_ref: RowRef<'_>,
666 staging: &mut TopNStaging,
667 managed_state: &mut ManagedTopNState<S>,
668 row_deserializer: &RowDeserializer,
669 ) -> StreamExecutorResult<()>;
670}
671
672impl AppendOnlyTopNCacheTrait for TopNCache<false> {
673 fn insert<S: StateStore>(
674 &mut self,
675 cache_key: CacheKey,
676 row_ref: RowRef<'_>,
677 staging: &mut TopNStaging,
678 managed_state: &mut ManagedTopNState<S>,
679 row_deserializer: &RowDeserializer,
680 ) -> StreamExecutorResult<()> {
681 if self.middle_is_full() && &cache_key >= self.middle.last_key().unwrap() {
682 return Ok(());
683 }
684 managed_state.insert(row_ref);
685
686 let mut to_insert = (cache_key, row_ref.into());
688
689 let low_is_full = self.low_is_full();
690 if let Some(low) = &mut self.low {
691 if !low_is_full {
694 low.insert(to_insert.0, to_insert.1);
695 return Ok(());
696 }
697
698 let low_last = low.last_entry().unwrap();
700 if &to_insert.0 < low_last.key() {
701 let low_last = low_last.remove_entry();
703 low.insert(to_insert.0, to_insert.1);
704 to_insert = low_last; }
706 }
707
708 if !self.middle_is_full() {
711 self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
712 staging.insert(to_insert.0, to_insert.1);
713 return Ok(());
714 }
715
716 let middle_last = self.middle.pop_last().unwrap();
719 debug_assert!(to_insert.0 < middle_last.0);
720 managed_state.delete(row_deserializer.deserialize(middle_last.1.row.as_ref())?);
721 staging.delete(middle_last.0, middle_last.1);
722
723 self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
724 staging.insert(to_insert.0, to_insert.1);
725
726 Ok(())
729 }
730}
731
732impl AppendOnlyTopNCacheTrait for TopNCache<true> {
733 fn insert<S: StateStore>(
734 &mut self,
735 cache_key: CacheKey,
736 row_ref: RowRef<'_>,
737 staging: &mut TopNStaging,
738 managed_state: &mut ManagedTopNState<S>,
739 row_deserializer: &RowDeserializer,
740 ) -> StreamExecutorResult<()> {
741 assert!(
742 self.low.is_none(),
743 "Offset is not supported yet for WITH TIES, so low cache should be empty"
744 );
745
746 let to_insert = (cache_key, row_ref);
747
748 if !self.middle_is_full() {
751 managed_state.insert(to_insert.1);
752 let row: CompactedRow = to_insert.1.into();
753 self.middle.insert(to_insert.0.clone(), row.clone());
754 staging.insert(to_insert.0, row);
755 return Ok(());
756 }
757
758 let to_insert_sort_key = &(to_insert.0).0;
761 let middle_last_sort_key = self.middle.last_key().unwrap().0.clone();
762
763 match to_insert_sort_key.cmp(&middle_last_sort_key) {
764 Ordering::Less => {
765 let n_ties_of_last = self
767 .middle
768 .range((middle_last_sort_key.clone(), vec![])..)
769 .count();
770 if self.middle.len() + 1 - n_ties_of_last >= self.limit {
778 while let Some(middle_last) = self.middle.last_entry()
780 && middle_last.key().0 == middle_last_sort_key
781 {
782 let middle_last = middle_last.remove_entry();
783 managed_state
785 .delete(row_deserializer.deserialize(middle_last.1.row.as_ref())?);
786 staging.delete(middle_last.0, middle_last.1);
787 }
788 }
789
790 managed_state.insert(to_insert.1);
791 let row: CompactedRow = to_insert.1.into();
792 self.middle.insert(to_insert.0.clone(), row.clone());
793 staging.insert(to_insert.0, row);
794 }
795 Ordering::Equal => {
796 managed_state.insert(to_insert.1);
798 let row: CompactedRow = to_insert.1.into();
799 self.middle.insert(to_insert.0.clone(), row.clone());
800 staging.insert(to_insert.0, row);
801 }
802 Ordering::Greater => {
803 }
805 }
806
807 Ok(())
808 }
809}
810
811#[derive(Debug, Default)]
814pub struct TopNStaging {
815 to_delete: BTreeMap<CacheKey, CompactedRow>,
816 to_insert: BTreeMap<CacheKey, CompactedRow>,
817 to_update: BTreeMap<CacheKey, (CompactedRow, CompactedRow)>,
818}
819
820impl TopNStaging {
821 pub fn new() -> Self {
822 Self::default()
823 }
824
825 fn insert(&mut self, cache_key: CacheKey, row: CompactedRow) {
828 if let Some(old_row) = self.to_delete.remove(&cache_key) {
829 if old_row != row {
830 self.to_update.insert(cache_key, (old_row, row));
831 }
832 } else {
833 self.to_insert.insert(cache_key, row);
834 }
835 }
836
837 fn delete(&mut self, cache_key: CacheKey, row: CompactedRow) {
840 if self.to_insert.remove(&cache_key).is_some() {
841 } else if let Some((old_row, _)) = self.to_update.remove(&cache_key) {
843 self.to_delete.insert(cache_key, old_row);
844 } else {
845 self.to_delete.insert(cache_key, row);
846 }
847 }
848
849 pub fn len(&self) -> usize {
851 self.to_delete.len() + self.to_insert.len() + self.to_update.len()
852 }
853
854 pub fn is_empty(&self) -> bool {
856 self.to_delete.is_empty() && self.to_insert.is_empty() && self.to_update.is_empty()
857 }
858
859 pub fn into_changes(self) -> impl Iterator<Item = (Op, CompactedRow)> {
861 #[cfg(debug_assertions)]
862 {
863 let keys = self
864 .to_delete
865 .keys()
866 .chain(self.to_insert.keys())
867 .chain(self.to_update.keys())
868 .unique()
869 .count();
870 assert_eq!(
871 keys,
872 self.to_delete.len() + self.to_insert.len() + self.to_update.len(),
873 "should not have duplicate keys with different operations",
874 );
875 }
876
877 self.to_update
882 .into_values()
883 .flat_map(|(old_row, new_row)| {
884 [(Op::UpdateDelete, old_row), (Op::UpdateInsert, new_row)]
885 })
886 .chain(self.to_delete.into_values().map(|row| (Op::Delete, row)))
887 .chain(self.to_insert.into_values().map(|row| (Op::Insert, row)))
888 }
889
890 pub fn into_deserialized_changes(
892 self,
893 deserializer: &RowDeserializer,
894 ) -> impl Iterator<Item = StreamExecutorResult<(Op, OwnedRow)>> + '_ {
895 self.into_changes()
896 .map(|(op, row)| Ok((op, deserializer.deserialize(row.row.as_ref())?)))
897 }
898}