1use std::cmp::Ordering;
16use std::collections::BTreeMap;
17use std::fmt::Debug;
18use std::future::Future;
19
20use itertools::Itertools;
21use risingwave_common::array::{Op, RowRef};
22use risingwave_common::row::{CompactedRow, OwnedRow, Row, RowDeserializer, RowExt};
23use risingwave_common::types::DataType;
24use risingwave_common_estimate_size::EstimateSize;
25use risingwave_common_estimate_size::collections::EstimatedBTreeMap;
26use risingwave_storage::StateStore;
27
28use super::{GroupKey, ManagedTopNState};
29use crate::consistency::{consistency_error, enable_strict_consistency};
30use crate::executor::error::StreamExecutorResult;
31
32pub type CacheKey = (Vec<u8>, Vec<u8>);
34pub type Cache = EstimatedBTreeMap<CacheKey, CompactedRow>;
35
36const TOPN_CACHE_HIGH_CAPACITY_FACTOR: usize = 2;
37const TOPN_CACHE_MIN_CAPACITY: usize = 10;
38
39pub struct TopNCache<const WITH_TIES: bool> {
51 pub low: Option<Cache>,
53
54 pub middle: Cache,
59
60 pub high: Cache,
75 pub high_cache_capacity: usize,
76
77 pub offset: usize,
78 pub limit: usize,
80
81 table_row_count: Option<usize>,
85
86 data_types: Vec<DataType>,
90}
91
92impl<const WITH_TIES: bool> EstimateSize for TopNCache<WITH_TIES> {
93 fn estimated_heap_size(&self) -> usize {
94 self.low.estimated_heap_size()
95 + self.middle.estimated_heap_size()
96 + self.high.estimated_heap_size()
97 }
98}
99
100impl<const WITH_TIES: bool> Debug for TopNCache<WITH_TIES> {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 write!(
103 f,
104 "TopNCache {{\n offset: {}, limit: {}, high_cache_capacity: {},\n",
105 self.offset, self.limit, self.high_cache_capacity
106 )?;
107
108 fn format_cache(
109 f: &mut std::fmt::Formatter<'_>,
110 cache: &Cache,
111 data_types: &[DataType],
112 ) -> std::fmt::Result {
113 if cache.is_empty() {
114 return write!(f, " <empty>");
115 }
116 write!(
117 f,
118 " {}",
119 cache
120 .iter()
121 .format_with("\n ", |item, f| f(&format_args!(
122 "{:?}, {}",
123 item.0,
124 item.1.deserialize(data_types).unwrap().display(),
125 )))
126 )
127 }
128
129 writeln!(f, " low:")?;
130 if let Some(low) = &self.low {
131 format_cache(f, low, &self.data_types)?;
132 } else {
133 writeln!(f, " <none>")?;
134 }
135 writeln!(f, "\n middle:")?;
136 format_cache(f, &self.middle, &self.data_types)?;
137 writeln!(f, "\n high:")?;
138 format_cache(f, &self.high, &self.data_types)?;
139
140 write!(f, "\n}}")?;
141 Ok(())
142 }
143}
144
145pub trait TopNCacheTrait {
149 fn insert(&mut self, cache_key: CacheKey, row: impl Row + Send, staging: &mut TopNStaging);
154
155 fn delete<S: StateStore>(
164 &mut self,
165 group_key: Option<impl GroupKey>,
166 managed_state: &mut ManagedTopNState<S>,
167 cache_key: CacheKey,
168 row: impl Row + Send,
169 staging: &mut TopNStaging,
170 ) -> impl Future<Output = StreamExecutorResult<()>> + Send;
171}
172
173impl<const WITH_TIES: bool> TopNCache<WITH_TIES> {
174 pub fn new(offset: usize, limit: usize, data_types: Vec<DataType>) -> Self {
177 Self::with_min_capacity(offset, limit, data_types, TOPN_CACHE_MIN_CAPACITY)
178 }
179
180 pub fn with_min_capacity(
183 offset: usize,
184 limit: usize,
185 data_types: Vec<DataType>,
186 min_capacity: usize,
187 ) -> Self {
188 assert!(limit > 0);
189 if WITH_TIES {
190 assert!(offset == 0, "OFFSET is not supported with WITH TIES");
193 }
194 let high_cache_capacity = offset
195 .checked_add(limit)
196 .and_then(|v| v.checked_mul(TOPN_CACHE_HIGH_CAPACITY_FACTOR))
197 .unwrap_or(usize::MAX)
198 .max(min_capacity);
199 Self {
200 low: if offset > 0 { Some(Cache::new()) } else { None },
201 middle: Cache::new(),
202 high: Cache::new(),
203 high_cache_capacity,
204 offset,
205 limit,
206 table_row_count: None,
207 data_types,
208 }
209 }
210
211 #[allow(dead_code)]
213 pub fn clear(&mut self) {
214 self.low.as_mut().map(Cache::clear);
215 self.middle.clear();
216 self.high.clear();
217 }
218
219 pub fn len(&self) -> usize {
221 self.low.as_ref().map(Cache::len).unwrap_or(0) + self.middle.len() + self.high.len()
222 }
223
224 pub(super) fn update_table_row_count(&mut self, table_row_count: usize) {
225 self.table_row_count = Some(table_row_count)
226 }
227
228 pub fn low_is_full(&self) -> bool {
229 if let Some(low) = &self.low {
230 assert!(low.len() <= self.offset);
231 let full = low.len() == self.offset;
232 if !full {
233 assert!(self.middle.is_empty());
234 assert!(self.high.is_empty());
235 }
236 full
237 } else {
238 true
239 }
240 }
241
242 pub fn middle_is_full(&self) -> bool {
243 if !WITH_TIES {
245 assert!(
246 self.middle.len() <= self.limit,
247 "the middle cache exceeds the capacity\n{self:?}"
248 );
249 }
250 let full = self.middle.len() >= self.limit;
251 if full {
252 assert!(self.low_is_full());
253 } else {
254 assert!(
255 self.high.is_empty(),
256 "the high cache is not empty when middle cache is not full:\n{self:?}"
257 );
258 }
259 full
260 }
261
262 pub fn high_is_full(&self) -> bool {
263 if !WITH_TIES {
265 assert!(self.high.len() <= self.high_cache_capacity);
266 }
267 self.high.len() >= self.high_cache_capacity
268 }
269
270 fn high_is_synced(&self) -> bool {
271 if !self.high.is_empty() {
272 true
273 } else {
274 self.table_row_count
276 .map(|n| n == self.len())
277 .unwrap_or(false)
278 }
279 }
280
281 fn last_cache_key_before_high(&self) -> Option<&CacheKey> {
282 let middle_last_key = self.middle.last_key_value().map(|(k, _)| k);
283 middle_last_key.or_else(|| {
284 self.low
285 .as_ref()
286 .and_then(Cache::last_key_value)
287 .map(|(k, _)| k)
288 })
289 }
290}
291
292impl TopNCacheTrait for TopNCache<false> {
293 fn insert(&mut self, cache_key: CacheKey, row: impl Row + Send, staging: &mut TopNStaging) {
294 if let Some(row_count) = self.table_row_count.as_mut() {
295 *row_count += 1;
296 }
297
298 let mut to_insert = (cache_key, (&row).into());
299 let mut is_last_of_lower_cache = false; let low_is_full = self.low_is_full();
302 if let Some(low) = &mut self.low {
303 if !low_is_full {
306 low.insert(to_insert.0, to_insert.1);
307 return;
308 }
309
310 let low_last = low.last_entry().unwrap();
312 if &to_insert.0 < low_last.key() {
313 let low_last = low_last.remove_entry();
315 low.insert(to_insert.0, to_insert.1);
316 to_insert = low_last; is_last_of_lower_cache = true;
318 }
319 }
320
321 if !self.middle_is_full() {
324 self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
325 staging.insert(to_insert.0, to_insert.1);
326 return;
327 }
328
329 let middle_last = self.middle.last_entry().unwrap();
331 if is_last_of_lower_cache || &to_insert.0 < middle_last.key() {
332 let middle_last = middle_last.remove_entry();
334 self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
335
336 staging.delete(middle_last.0.clone(), middle_last.1.clone());
337 staging.insert(to_insert.0, to_insert.1);
338
339 to_insert = middle_last; is_last_of_lower_cache = true;
341 }
342
343 if is_last_of_lower_cache || self.high_is_synced() {
349 if self.high.is_empty() {
353 self.high.insert(to_insert.0, to_insert.1);
355 return;
356 }
357
358 let high_is_full = self.high_is_full();
359 let high_last = self.high.last_entry().unwrap();
360
361 if is_last_of_lower_cache || &to_insert.0 < high_last.key() {
362 if high_is_full {
364 high_last.remove_entry();
366 }
367 self.high.insert(to_insert.0, to_insert.1);
368 }
369 }
370 }
371
372 async fn delete<S: StateStore>(
373 &mut self,
374 group_key: Option<impl GroupKey>,
375 managed_state: &mut ManagedTopNState<S>,
376 cache_key: CacheKey,
377 row: impl Row + Send,
378 staging: &mut TopNStaging,
379 ) -> StreamExecutorResult<()> {
380 if !enable_strict_consistency() && self.table_row_count == Some(0) {
381 consistency_error!("table row count is 0, but we receive a DELETE operation");
384 self.table_row_count = None;
385 }
386 if let Some(row_count) = self.table_row_count.as_mut() {
387 *row_count -= 1;
388 }
389
390 if self.middle_is_full() && &cache_key > self.middle.last_key_value().unwrap().0 {
391 self.high.remove(&cache_key);
393 } else if self.low_is_full()
394 && self
395 .low
396 .as_ref()
397 .map(|low| &cache_key > low.last_key_value().unwrap().0)
398 .unwrap_or(
399 true, )
401 {
402 let removed = self.middle.remove(&cache_key);
404 staging.delete(cache_key.clone(), (&row).into());
405
406 if removed.is_none() {
407 consistency_error!(
410 ?group_key,
411 ?cache_key,
412 "cache key not found in middle cache"
413 );
414 return Ok(());
415 }
416
417 if !self.high_is_synced() {
419 self.high.clear();
420 managed_state
421 .fill_high_cache(
422 group_key,
423 self,
424 self.last_cache_key_before_high().cloned(),
425 self.high_cache_capacity,
426 )
427 .await?;
428 }
429
430 if !self.high.is_empty() {
432 let high_first = self.high.pop_first().unwrap();
433 self.middle
434 .insert(high_first.0.clone(), high_first.1.clone());
435 staging.insert(high_first.0, high_first.1);
436 }
437
438 assert!(self.high.is_empty() || self.middle.len() == self.limit);
439 } else {
440 let low = self.low.as_mut().unwrap();
442 let removed = low.remove(&cache_key);
443
444 if removed.is_none() {
445 consistency_error!(?group_key, ?cache_key, "cache key not found in low cache");
448 return Ok(());
449 }
450
451 if !self.middle.is_empty() {
453 let middle_first = self.middle.pop_first().unwrap();
454 staging.delete(middle_first.0.clone(), middle_first.1.clone());
455 low.insert(middle_first.0, middle_first.1);
456
457 if !self.high_is_synced() {
459 self.high.clear();
460 managed_state
461 .fill_high_cache(
462 group_key,
463 self,
464 self.last_cache_key_before_high().cloned(),
465 self.high_cache_capacity,
466 )
467 .await?;
468 }
469
470 if !self.high.is_empty() {
472 let high_first = self.high.pop_first().unwrap();
473 self.middle
474 .insert(high_first.0.clone(), high_first.1.clone());
475 staging.insert(high_first.0, high_first.1);
476 }
477 }
478 }
479
480 Ok(())
481 }
482}
483
484impl TopNCacheTrait for TopNCache<true> {
485 fn insert(&mut self, cache_key: CacheKey, row: impl Row + Send, staging: &mut TopNStaging) {
486 if let Some(row_count) = self.table_row_count.as_mut() {
487 *row_count += 1;
488 }
489
490 assert!(
491 self.low.is_none(),
492 "Offset is not supported yet for WITH TIES, so low cache should be None"
493 );
494
495 let to_insert: (CacheKey, CompactedRow) = (cache_key, (&row).into());
496
497 if !self.middle_is_full() {
500 self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
501 staging.insert(to_insert.0.clone(), to_insert.1);
502 return;
503 }
504
505 let to_insert_sort_key = &(to_insert.0).0;
508 let middle_last_sort_key = self.middle.last_key().unwrap().0.clone();
509
510 match to_insert_sort_key.cmp(&middle_last_sort_key) {
511 Ordering::Less => {
512 let n_ties_of_last = self
514 .middle
515 .range((middle_last_sort_key.clone(), vec![])..)
516 .count();
517 if self.middle.len() + 1 - n_ties_of_last >= self.limit {
525 while let Some(middle_last) = self.middle.last_entry()
528 && middle_last.key().0 == middle_last_sort_key
529 {
530 let middle_last = middle_last.remove_entry();
531 staging.delete(middle_last.0.clone(), middle_last.1.clone());
532 self.high.insert(middle_last.0, middle_last.1);
534 }
535 }
536 if self.high.len() > self.high_cache_capacity {
537 let high_last = self.high.pop_last().unwrap();
539 let high_last_sort_key = (high_last.0).0;
540 self.high.retain(|k, _| k.0 != high_last_sort_key);
543 }
544
545 self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
546 staging.insert(to_insert.0, to_insert.1);
547 }
548 Ordering::Equal => {
549 self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
551 staging.insert(to_insert.0, to_insert.1);
552 }
553 Ordering::Greater => {
554 if self.high_is_synced() {
557 if self.high.is_empty() {
560 self.high.insert(to_insert.0, to_insert.1);
562 return;
563 }
564
565 if to_insert_sort_key <= &self.high.last_key().unwrap().0 {
566 self.high.insert(to_insert.0, to_insert.1);
570 }
571
572 if self.high.len() > self.high_cache_capacity {
573 let high_last = self.high.pop_last().unwrap();
575 let high_last_sort_key = (high_last.0).0;
576 self.high.retain(|k, _| k.0 != high_last_sort_key);
579 }
580 }
581 }
582 }
583 }
584
585 async fn delete<S: StateStore>(
586 &mut self,
587 group_key: Option<impl GroupKey>,
588 managed_state: &mut ManagedTopNState<S>,
589 cache_key: CacheKey,
590 row: impl Row + Send,
591 staging: &mut TopNStaging,
592 ) -> StreamExecutorResult<()> {
593 if !enable_strict_consistency() && self.table_row_count == Some(0) {
594 self.table_row_count = None;
597 }
598 if let Some(row_count) = self.table_row_count.as_mut() {
599 *row_count -= 1;
600 }
601
602 assert!(
603 self.low.is_none(),
604 "Offset is not supported yet for WITH TIES, so low cache should be None"
605 );
606
607 if self.middle.is_empty() {
608 consistency_error!(
609 ?group_key,
610 ?cache_key,
611 "middle cache is empty, but we receive a DELETE operation"
612 );
613 staging.delete(cache_key, (&row).into());
614 return Ok(());
615 }
616
617 let middle_last_sort_key = self.middle.last_key().unwrap().0.clone();
618
619 let to_delete_sort_key = cache_key.0.clone();
620 if to_delete_sort_key > middle_last_sort_key {
621 self.high.remove(&cache_key);
623 } else {
624 self.middle.remove(&cache_key);
626 staging.delete(cache_key.clone(), (&row).into());
627 if self.middle.len() >= self.limit {
628 return Ok(());
630 }
631
632 if !self.high_is_synced() {
634 managed_state
635 .fill_high_cache(
636 group_key,
637 self,
638 self.last_cache_key_before_high().cloned(),
639 self.high_cache_capacity,
640 )
641 .await?;
642 }
643
644 if !self.high.is_empty() {
646 let high_first = self.high.pop_first().unwrap();
647 let high_first_sort_key = (high_first.0).0.clone();
648 assert!(high_first_sort_key > middle_last_sort_key);
649
650 self.middle
651 .insert(high_first.0.clone(), high_first.1.clone());
652 staging.insert(high_first.0, high_first.1);
653
654 for (cache_key, row) in self.high.extract_if(|k, _| k.0 == high_first_sort_key) {
655 self.middle.insert(cache_key.clone(), row.clone());
656 staging.insert(cache_key, row);
657 }
658 }
659 }
660
661 Ok(())
662 }
663}
664
665pub trait AppendOnlyTopNCacheTrait {
667 fn insert<S: StateStore>(
675 &mut self,
676 cache_key: CacheKey,
677 row_ref: RowRef<'_>,
678 staging: &mut TopNStaging,
679 managed_state: &mut ManagedTopNState<S>,
680 row_deserializer: &RowDeserializer,
681 ) -> StreamExecutorResult<()>;
682}
683
684impl AppendOnlyTopNCacheTrait for TopNCache<false> {
685 fn insert<S: StateStore>(
686 &mut self,
687 cache_key: CacheKey,
688 row_ref: RowRef<'_>,
689 staging: &mut TopNStaging,
690 managed_state: &mut ManagedTopNState<S>,
691 row_deserializer: &RowDeserializer,
692 ) -> StreamExecutorResult<()> {
693 if self.middle_is_full() && &cache_key >= self.middle.last_key().unwrap() {
694 return Ok(());
695 }
696 managed_state.insert(row_ref);
697
698 let mut to_insert = (cache_key, row_ref.into());
700
701 let low_is_full = self.low_is_full();
702 if let Some(low) = &mut self.low {
703 if !low_is_full {
706 low.insert(to_insert.0, to_insert.1);
707 return Ok(());
708 }
709
710 let low_last = low.last_entry().unwrap();
712 if &to_insert.0 < low_last.key() {
713 let low_last = low_last.remove_entry();
715 low.insert(to_insert.0, to_insert.1);
716 to_insert = low_last; }
718 }
719
720 if !self.middle_is_full() {
723 self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
724 staging.insert(to_insert.0, to_insert.1);
725 return Ok(());
726 }
727
728 let middle_last = self.middle.pop_last().unwrap();
731 debug_assert!(to_insert.0 < middle_last.0);
732 managed_state.delete(row_deserializer.deserialize(middle_last.1.row.as_ref())?);
733 staging.delete(middle_last.0, middle_last.1);
734
735 self.middle.insert(to_insert.0.clone(), to_insert.1.clone());
736 staging.insert(to_insert.0, to_insert.1);
737
738 Ok(())
741 }
742}
743
744impl AppendOnlyTopNCacheTrait for TopNCache<true> {
745 fn insert<S: StateStore>(
746 &mut self,
747 cache_key: CacheKey,
748 row_ref: RowRef<'_>,
749 staging: &mut TopNStaging,
750 managed_state: &mut ManagedTopNState<S>,
751 row_deserializer: &RowDeserializer,
752 ) -> StreamExecutorResult<()> {
753 assert!(
754 self.low.is_none(),
755 "Offset is not supported yet for WITH TIES, so low cache should be empty"
756 );
757
758 let to_insert = (cache_key, row_ref);
759
760 if !self.middle_is_full() {
763 managed_state.insert(to_insert.1);
764 let row: CompactedRow = to_insert.1.into();
765 self.middle.insert(to_insert.0.clone(), row.clone());
766 staging.insert(to_insert.0, row);
767 return Ok(());
768 }
769
770 let to_insert_sort_key = &(to_insert.0).0;
773 let middle_last_sort_key = self.middle.last_key().unwrap().0.clone();
774
775 match to_insert_sort_key.cmp(&middle_last_sort_key) {
776 Ordering::Less => {
777 let n_ties_of_last = self
779 .middle
780 .range((middle_last_sort_key.clone(), vec![])..)
781 .count();
782 if self.middle.len() + 1 - n_ties_of_last >= self.limit {
790 while let Some(middle_last) = self.middle.last_entry()
792 && middle_last.key().0 == middle_last_sort_key
793 {
794 let middle_last = middle_last.remove_entry();
795 managed_state
797 .delete(row_deserializer.deserialize(middle_last.1.row.as_ref())?);
798 staging.delete(middle_last.0, middle_last.1);
799 }
800 }
801
802 managed_state.insert(to_insert.1);
803 let row: CompactedRow = to_insert.1.into();
804 self.middle.insert(to_insert.0.clone(), row.clone());
805 staging.insert(to_insert.0, row);
806 }
807 Ordering::Equal => {
808 managed_state.insert(to_insert.1);
810 let row: CompactedRow = to_insert.1.into();
811 self.middle.insert(to_insert.0.clone(), row.clone());
812 staging.insert(to_insert.0, row);
813 }
814 Ordering::Greater => {
815 }
817 }
818
819 Ok(())
820 }
821}
822
823#[derive(Debug, Default)]
826pub struct TopNStaging {
827 to_delete: BTreeMap<CacheKey, CompactedRow>,
828 to_insert: BTreeMap<CacheKey, CompactedRow>,
829 to_update: BTreeMap<CacheKey, (CompactedRow, CompactedRow)>,
830}
831
832impl TopNStaging {
833 pub fn new() -> Self {
834 Self::default()
835 }
836
837 fn insert(&mut self, cache_key: CacheKey, row: CompactedRow) {
840 if let Some(old_row) = self.to_delete.remove(&cache_key) {
841 if old_row != row {
842 self.to_update.insert(cache_key, (old_row, row));
843 }
844 } else {
845 self.to_insert.insert(cache_key, row);
846 }
847 }
848
849 fn delete(&mut self, cache_key: CacheKey, row: CompactedRow) {
852 if self.to_insert.remove(&cache_key).is_some() {
853 } else if let Some((old_row, _)) = self.to_update.remove(&cache_key) {
855 self.to_delete.insert(cache_key, old_row);
856 } else {
857 self.to_delete.insert(cache_key, row);
858 }
859 }
860
861 pub fn len(&self) -> usize {
863 self.to_delete.len() + self.to_insert.len() + self.to_update.len()
864 }
865
866 pub fn is_empty(&self) -> bool {
868 self.to_delete.is_empty() && self.to_insert.is_empty() && self.to_update.is_empty()
869 }
870
871 pub fn into_changes(self) -> impl Iterator<Item = (Op, CompactedRow)> {
873 #[cfg(debug_assertions)]
874 {
875 let keys = self
876 .to_delete
877 .keys()
878 .chain(self.to_insert.keys())
879 .chain(self.to_update.keys())
880 .unique()
881 .count();
882 assert_eq!(
883 keys,
884 self.to_delete.len() + self.to_insert.len() + self.to_update.len(),
885 "should not have duplicate keys with different operations",
886 );
887 }
888
889 self.to_update
894 .into_values()
895 .flat_map(|(old_row, new_row)| {
896 [(Op::UpdateDelete, old_row), (Op::UpdateInsert, new_row)]
897 })
898 .chain(self.to_delete.into_values().map(|row| (Op::Delete, row)))
899 .chain(self.to_insert.into_values().map(|row| (Op::Insert, row)))
900 }
901
902 pub fn into_deserialized_changes(
904 self,
905 deserializer: &RowDeserializer,
906 ) -> impl Iterator<Item = StreamExecutorResult<(Op, OwnedRow)>> + '_ {
907 self.into_changes()
908 .map(|(op, row)| Ok((op, deserializer.deserialize(row.row.as_ref())?)))
909 }
910}
911
912#[cfg(test)]
913mod tests {
914 use risingwave_common::types::DataType;
915
916 use super::*;
917
918 #[test]
919 fn test_topn_cache_new_uses_default_min_capacity() {
920 let cache = TopNCache::<false>::new(0, 5, vec![DataType::Int32]);
921 assert_eq!(cache.high_cache_capacity, TOPN_CACHE_MIN_CAPACITY);
922 }
923
924 #[test]
925 fn test_topn_cache_with_custom_min_capacity() {
926 let custom_min_capacity = 25;
927 let cache =
928 TopNCache::<false>::with_min_capacity(0, 5, vec![DataType::Int32], custom_min_capacity);
929 assert_eq!(cache.high_cache_capacity, custom_min_capacity);
930 }
931
932 #[test]
933 fn test_topn_cache_high_capacity_factor_respected() {
934 let custom_min_capacity = 1;
935 let offset = 2;
936 let limit = 5;
937 let expected_capacity = (offset + limit) * TOPN_CACHE_HIGH_CAPACITY_FACTOR;
938
939 let cache = TopNCache::<false>::with_min_capacity(
940 offset,
941 limit,
942 vec![DataType::Int32],
943 custom_min_capacity,
944 );
945 assert_eq!(cache.high_cache_capacity, expected_capacity);
946 }
947
948 #[test]
949 fn test_topn_cache_min_capacity_takes_precedence_when_larger() {
950 let large_min_capacity = 100;
951 let offset = 0;
952 let limit = 5;
953 let expected_from_formula = (offset + limit) * TOPN_CACHE_HIGH_CAPACITY_FACTOR; let cache = TopNCache::<false>::with_min_capacity(
956 offset,
957 limit,
958 vec![DataType::Int32],
959 large_min_capacity,
960 );
961 assert_eq!(cache.high_cache_capacity, large_min_capacity);
962 assert!(cache.high_cache_capacity > expected_from_formula);
963 }
964}