1use std::collections::BTreeMap;
19use std::marker::PhantomData;
20use std::ops::{Bound, RangeInclusive};
21
22use delta_btree_map::{Change, DeltaBTreeMap};
23use educe::Educe;
24use futures_async_stream::for_await;
25use risingwave_common::array::stream_record::Record;
26use risingwave_common::row::{OwnedRow, Row, RowExt};
27use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy;
28use risingwave_common::types::{Datum, Sentinelled};
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common_estimate_size::collections::EstimatedBTreeMap;
31use risingwave_expr::window_function::{StateKey, WindowStates, create_window_state};
32use risingwave_storage::StateStore;
33use risingwave_storage::store::PrefetchOptions;
34use static_assertions::const_assert;
35
36use super::general::{Calls, RowConverter};
37use crate::common::table::state_table::StateTable;
38use crate::consistency::{consistency_error, enable_strict_consistency};
39use crate::executor::StreamExecutorResult;
40use crate::executor::over_window::frame_finder::*;
41
42pub(super) type CacheKey = Sentinelled<StateKey>;
43
44pub(super) type PartitionCache = EstimatedBTreeMap<CacheKey, OwnedRow>;
56
57pub(super) type PartitionDelta = BTreeMap<CacheKey, Change<OwnedRow>>;
59
60pub(super) fn new_empty_partition_cache() -> PartitionCache {
61 let mut cache = PartitionCache::new();
62 cache.insert(CacheKey::Smallest, OwnedRow::empty());
63 cache.insert(CacheKey::Largest, OwnedRow::empty());
64 cache
65}
66
67const MAGIC_CACHE_SIZE: usize = 1024;
68const MAGIC_JITTER_PREVENTION: usize = MAGIC_CACHE_SIZE / 8;
69
70pub(super) fn shrink_partition_cache(
71 deduped_part_key: &OwnedRow,
72 range_cache: &mut PartitionCache,
73 cache_policy: CachePolicy,
74 recently_accessed_range: RangeInclusive<StateKey>,
75) {
76 tracing::trace!(
77 partition=?deduped_part_key,
78 cache_policy=?cache_policy,
79 recently_accessed_range=?recently_accessed_range,
80 "find the range to retain in the range cache"
81 );
82
83 let (start, end) = match cache_policy {
84 CachePolicy::Full => {
85 return;
87 }
88 CachePolicy::Recent => {
89 let (sk_start, sk_end) = recently_accessed_range.into_inner();
90 let (ck_start, ck_end) = (CacheKey::from(sk_start), CacheKey::from(sk_end));
91
92 let mut cursor = range_cache.inner().upper_bound(Bound::Excluded(&ck_start));
94 for _ in 0..MAGIC_JITTER_PREVENTION {
95 if cursor.prev().is_none() {
96 break;
98 }
99 }
100 let start = cursor
101 .peek_prev()
102 .map(|(k, _)| k)
103 .unwrap_or_else(|| range_cache.first_key_value().unwrap().0)
104 .clone();
105
106 let mut cursor = range_cache.inner().lower_bound(Bound::Excluded(&ck_end));
108 for _ in 0..MAGIC_JITTER_PREVENTION {
109 if cursor.next().is_none() {
110 break;
112 }
113 }
114 let end = cursor
115 .peek_next()
116 .map(|(k, _)| k)
117 .unwrap_or_else(|| range_cache.last_key_value().unwrap().0)
118 .clone();
119
120 (start, end)
121 }
122 CachePolicy::RecentFirstN => {
123 if range_cache.len() <= MAGIC_CACHE_SIZE {
124 return;
126 } else {
127 let (sk_start, _sk_end) = recently_accessed_range.into_inner();
128 let ck_start = CacheKey::from(sk_start);
129
130 let mut capacity_remain = MAGIC_CACHE_SIZE; const_assert!(MAGIC_JITTER_PREVENTION < MAGIC_CACHE_SIZE);
132
133 let cursor_just_before_ck_start =
135 range_cache.inner().upper_bound(Bound::Excluded(&ck_start));
136
137 let mut cursor = cursor_just_before_ck_start.clone();
138 for _ in 0..MAGIC_JITTER_PREVENTION {
140 if cursor.prev().is_none() {
141 break;
143 }
144 capacity_remain -= 1;
145 }
146 let start = cursor
147 .peek_prev()
148 .map(|(k, _)| k)
149 .unwrap_or_else(|| range_cache.first_key_value().unwrap().0)
150 .clone();
151
152 let mut cursor = cursor_just_before_ck_start;
153 for _ in 0..capacity_remain {
155 if cursor.next().is_none() {
156 break;
158 }
159 }
160 let end = cursor
161 .peek_next()
162 .map(|(k, _)| k)
163 .unwrap_or_else(|| range_cache.last_key_value().unwrap().0)
164 .clone();
165
166 (start, end)
167 }
168 }
169 CachePolicy::RecentLastN => {
170 if range_cache.len() <= MAGIC_CACHE_SIZE {
171 return;
173 } else {
174 let (_sk_start, sk_end) = recently_accessed_range.into_inner();
175 let ck_end = CacheKey::from(sk_end);
176
177 let mut capacity_remain = MAGIC_CACHE_SIZE; const_assert!(MAGIC_JITTER_PREVENTION < MAGIC_CACHE_SIZE);
179
180 let cursor_just_after_ck_end =
182 range_cache.inner().lower_bound(Bound::Excluded(&ck_end));
183
184 let mut cursor = cursor_just_after_ck_end.clone();
185 for _ in 0..MAGIC_JITTER_PREVENTION {
187 if cursor.next().is_none() {
188 break;
190 }
191 capacity_remain -= 1;
192 }
193 let end = cursor
194 .peek_next()
195 .map(|(k, _)| k)
196 .unwrap_or_else(|| range_cache.last_key_value().unwrap().0)
197 .clone();
198
199 let mut cursor = cursor_just_after_ck_end;
200 for _ in 0..capacity_remain {
202 if cursor.prev().is_none() {
203 break;
205 }
206 }
207 let start = cursor
208 .peek_prev()
209 .map(|(k, _)| k)
210 .unwrap_or_else(|| range_cache.first_key_value().unwrap().0)
211 .clone();
212
213 (start, end)
214 }
215 }
216 };
217
218 tracing::trace!(
219 partition=?deduped_part_key,
220 retain_range=?(&start..=&end),
221 "retain range in the range cache"
222 );
223
224 let (left_removed, right_removed) = range_cache.retain_range(&start..=&end);
225 if range_cache.is_empty() {
226 if !left_removed.is_empty() || !right_removed.is_empty() {
227 range_cache.insert(CacheKey::Smallest, OwnedRow::empty());
228 range_cache.insert(CacheKey::Largest, OwnedRow::empty());
229 }
230 } else {
231 if !left_removed.is_empty() {
232 range_cache.insert(CacheKey::Smallest, OwnedRow::empty());
233 }
234 if !right_removed.is_empty() {
235 range_cache.insert(CacheKey::Largest, OwnedRow::empty());
236 }
237 }
238}
239
240#[derive(Default, Debug)]
241pub(super) struct OverPartitionStats {
242 pub lookup_count: u64,
244 pub left_miss_count: u64,
245 pub right_miss_count: u64,
246
247 pub accessed_entry_count: u64,
249 pub compute_count: u64,
250 pub same_output_count: u64,
251}
252
253#[derive(Debug, Educe)]
263#[educe(Clone, Copy)]
264pub(super) struct AffectedRange<'a> {
265 pub first_frame_start: &'a CacheKey,
266 pub first_curr_key: &'a CacheKey,
267 pub last_curr_key: &'a CacheKey,
268 pub last_frame_end: &'a CacheKey,
269}
270
271impl<'a> AffectedRange<'a> {
272 fn new(
273 first_frame_start: &'a CacheKey,
274 first_curr_key: &'a CacheKey,
275 last_curr_key: &'a CacheKey,
276 last_frame_end: &'a CacheKey,
277 ) -> Self {
278 Self {
279 first_frame_start,
280 first_curr_key,
281 last_curr_key,
282 last_frame_end,
283 }
284 }
285}
286
287pub(super) struct OverPartition<'a, S: StateStore> {
291 deduped_part_key: &'a OwnedRow,
292 range_cache: &'a mut PartitionCache,
293 cache_policy: CachePolicy,
294
295 calls: &'a Calls,
296 row_conv: RowConverter<'a>,
297
298 stats: OverPartitionStats,
299
300 _phantom: PhantomData<S>,
301}
302
303const MAGIC_BATCH_SIZE: usize = 512;
304
305impl<'a, S: StateStore> OverPartition<'a, S> {
306 #[allow(clippy::too_many_arguments)]
307 pub fn new(
308 deduped_part_key: &'a OwnedRow,
309 cache: &'a mut PartitionCache,
310 cache_policy: CachePolicy,
311 calls: &'a Calls,
312 row_conv: RowConverter<'a>,
313 ) -> Self {
314 Self {
315 deduped_part_key,
316 range_cache: cache,
317 cache_policy,
318
319 calls,
320 row_conv,
321
322 stats: Default::default(),
323
324 _phantom: PhantomData,
325 }
326 }
327
328 pub fn summarize(self) -> OverPartitionStats {
331 self.stats
333 }
334
335 pub fn cache_real_len(&self) -> usize {
337 let len = self.range_cache.inner().len();
338 if len <= 1 {
339 debug_assert!(
340 self.range_cache
341 .inner()
342 .first_key_value()
343 .map(|(k, _)| k.is_normal())
344 .unwrap_or(true)
345 );
346 return len;
347 }
348 let cache_inner = self.range_cache.inner();
350 let sentinels = [
351 cache_inner.first_key_value().unwrap().0.is_sentinel(),
353 cache_inner.last_key_value().unwrap().0.is_sentinel(),
354 ];
355 len - sentinels.into_iter().filter(|x| *x).count()
356 }
357
358 fn cache_real_first_key(&self) -> Option<&StateKey> {
359 self.range_cache
360 .inner()
361 .iter()
362 .find(|(k, _)| k.is_normal())
363 .map(|(k, _)| k.as_normal_expect())
364 }
365
366 fn cache_real_last_key(&self) -> Option<&StateKey> {
367 self.range_cache
368 .inner()
369 .iter()
370 .rev()
371 .find(|(k, _)| k.is_normal())
372 .map(|(k, _)| k.as_normal_expect())
373 }
374
375 fn cache_left_is_sentinel(&self) -> bool {
376 self.range_cache
377 .first_key_value()
378 .map(|(k, _)| k.is_sentinel())
379 .unwrap_or(false)
380 }
381
382 fn cache_right_is_sentinel(&self) -> bool {
383 self.range_cache
384 .last_key_value()
385 .map(|(k, _)| k.is_sentinel())
386 .unwrap_or(false)
387 }
388
389 pub async fn build_changes(
392 &mut self,
393 table: &StateTable<S>,
394 mut delta: PartitionDelta,
395 ) -> StreamExecutorResult<(
396 BTreeMap<StateKey, Record<OwnedRow>>,
397 Option<RangeInclusive<StateKey>>,
398 )> {
399 let calls = self.calls;
400 let input_schema_len = table.get_data_types().len() - calls.len();
401 let numbering_only = calls.numbering_only;
402 let has_rank = calls.has_rank;
403
404 let mut part_changes = BTreeMap::new();
406 let mut accessed_range: Option<RangeInclusive<StateKey>> = None;
407
408 let mut accessed_entry_count = 0;
410 let mut compute_count = 0;
411 let mut same_output_count = 0;
412
413 let (part_with_delta, affected_ranges) =
415 self.find_affected_ranges(table, &mut delta).await?;
416
417 let snapshot = part_with_delta.snapshot();
418 let delta = part_with_delta.delta();
419 let last_delta_key = delta.last_key_value().map(|(k, _)| k.as_normal_expect());
420
421 for (key, change) in delta {
424 if change.is_delete() {
425 part_changes.insert(
426 key.as_normal_expect().clone(),
427 Record::Delete {
428 old_row: snapshot.get(key).unwrap().clone(),
429 },
430 );
431 }
432 }
433
434 for AffectedRange {
435 first_frame_start,
436 first_curr_key,
437 last_curr_key,
438 last_frame_end,
439 } in affected_ranges
440 {
441 assert!(first_frame_start <= first_curr_key);
442 assert!(first_curr_key <= last_curr_key);
443 assert!(last_curr_key <= last_frame_end);
444 assert!(first_frame_start.is_normal());
445 assert!(first_curr_key.is_normal());
446 assert!(last_curr_key.is_normal());
447 assert!(last_frame_end.is_normal());
448
449 let last_delta_key = last_delta_key.unwrap();
450
451 if let Some(accessed_range) = accessed_range.as_mut() {
452 let min_start = first_frame_start
453 .as_normal_expect()
454 .min(accessed_range.start())
455 .clone();
456 let max_end = last_frame_end
457 .as_normal_expect()
458 .max(accessed_range.end())
459 .clone();
460 *accessed_range = min_start..=max_end;
461 } else {
462 accessed_range = Some(
463 first_frame_start.as_normal_expect().clone()
464 ..=last_frame_end.as_normal_expect().clone(),
465 );
466 }
467
468 let mut states =
469 WindowStates::new(calls.iter().map(create_window_state).try_collect()?);
470
471 {
473 let mut cursor = part_with_delta
474 .before(first_frame_start)
475 .expect("first frame start key must exist");
476
477 while let Some((key, row)) = cursor.next() {
478 accessed_entry_count += 1;
479
480 for (call, state) in calls.iter().zip_eq_fast(states.iter_mut()) {
481 state.append(
484 key.as_normal_expect().clone(),
485 row.project(call.args.val_indices())
486 .into_owned_row()
487 .as_inner()
488 .into(),
489 );
490 }
491
492 if key == last_frame_end {
493 break;
494 }
495 }
496 }
497
498 states.just_slide_to(first_curr_key.as_normal_expect())?;
501 let mut curr_key_cursor = part_with_delta.before(first_curr_key).unwrap();
502 assert_eq!(
503 states.curr_key(),
504 curr_key_cursor
505 .peek_next()
506 .map(|(k, _)| k)
507 .map(CacheKey::as_normal_expect)
508 );
509
510 while let Some((key, row)) = curr_key_cursor.next() {
512 let mut should_stop = false;
513
514 let output = states.slide_no_evict_hint()?;
515 compute_count += 1;
516
517 let old_output = &row.as_inner()[input_schema_len..];
518 if !old_output.is_empty() && old_output == output {
519 same_output_count += 1;
520
521 if numbering_only {
522 if has_rank {
523 if key.as_normal_expect().order_key > last_delta_key.order_key {
526 should_stop = true;
528 }
529 } else if key.as_normal_expect() >= last_delta_key {
530 should_stop = true;
532 }
533 }
534 }
535
536 let new_row = OwnedRow::new(
537 row.as_inner()
538 .iter()
539 .take(input_schema_len)
540 .cloned()
541 .chain(output)
542 .collect(),
543 );
544
545 if let Some(old_row) = snapshot.get(key).cloned() {
546 if old_row != new_row {
548 part_changes.insert(
549 key.as_normal_expect().clone(),
550 Record::Update { old_row, new_row },
551 );
552 }
553 } else {
554 part_changes.insert(key.as_normal_expect().clone(), Record::Insert { new_row });
556 }
557
558 if should_stop || key == last_curr_key {
559 break;
560 }
561 }
562 }
563
564 self.stats.accessed_entry_count += accessed_entry_count;
565 self.stats.compute_count += compute_count;
566 self.stats.same_output_count += same_output_count;
567
568 Ok((part_changes, accessed_range))
569 }
570
571 pub fn write_record(
575 &mut self,
576 table: &mut StateTable<S>,
577 key: StateKey,
578 record: Record<OwnedRow>,
579 ) {
580 table.write_record(record.as_ref());
581 match record {
582 Record::Insert { new_row } | Record::Update { new_row, .. } => {
583 self.range_cache.insert(CacheKey::from(key), new_row);
584 }
585 Record::Delete { .. } => {
586 self.range_cache.remove(&CacheKey::from(key));
587
588 if self.cache_real_len() == 0 && self.range_cache.len() == 1 {
589 self.range_cache
591 .insert(CacheKey::Smallest, OwnedRow::empty());
592 self.range_cache
593 .insert(CacheKey::Largest, OwnedRow::empty());
594 }
595 }
596 }
597 }
598
599 async fn find_affected_ranges<'s, 'delta>(
603 &'s mut self,
604 table: &StateTable<S>,
605 delta: &'delta mut PartitionDelta,
606 ) -> StreamExecutorResult<(
607 DeltaBTreeMap<'delta, CacheKey, OwnedRow>,
608 Vec<AffectedRange<'delta>>,
609 )>
610 where
611 'a: 'delta,
612 's: 'delta,
613 {
614 if delta.is_empty() {
615 return Ok((DeltaBTreeMap::new(self.range_cache.inner(), delta), vec![]));
616 }
617
618 self.ensure_delta_in_cache(table, delta).await?;
619 let delta = &*delta; let delta_first = delta.first_key_value().unwrap().0.as_normal_expect();
622 let delta_last = delta.last_key_value().unwrap().0.as_normal_expect();
623
624 let range_frame_logical_curr =
625 calc_logical_curr_for_range_frames(&self.calls.range_frames, delta_first, delta_last);
626
627 loop {
628 let cache_inner = unsafe { &*(self.range_cache.inner() as *const _) };
637 let part_with_delta = DeltaBTreeMap::new(cache_inner, delta);
638
639 self.stats.lookup_count += 1;
640 let res = self
641 .find_affected_ranges_readonly(part_with_delta, range_frame_logical_curr.as_ref());
642
643 let (need_extend_leftward, need_extend_rightward) = match res {
644 Ok(ranges) => return Ok((part_with_delta, ranges)),
645 Err(cache_extend_hint) => cache_extend_hint,
646 };
647
648 if need_extend_leftward {
649 self.stats.left_miss_count += 1;
650 tracing::trace!(partition=?self.deduped_part_key, "partition cache left extension triggered");
651 let left_most = self.cache_real_first_key().unwrap_or(delta_first).clone();
652 self.extend_cache_leftward_by_n(table, &left_most).await?;
653 }
654 if need_extend_rightward {
655 self.stats.right_miss_count += 1;
656 tracing::trace!(partition=?self.deduped_part_key, "partition cache right extension triggered");
657 let right_most = self.cache_real_last_key().unwrap_or(delta_last).clone();
658 self.extend_cache_rightward_by_n(table, &right_most).await?;
659 }
660 tracing::trace!(partition=?self.deduped_part_key, "partition cache extended");
661 }
662 }
663
664 async fn ensure_delta_in_cache(
665 &mut self,
666 table: &StateTable<S>,
667 delta: &mut PartitionDelta,
668 ) -> StreamExecutorResult<()> {
669 if delta.is_empty() {
670 return Ok(());
671 }
672
673 let delta_first = delta.first_key_value().unwrap().0.as_normal_expect();
674 let delta_last = delta.last_key_value().unwrap().0.as_normal_expect();
675
676 if self.cache_policy.is_full() {
677 self.extend_cache_to_boundary(table).await?;
679 } else {
680 self.extend_cache_by_range(table, delta_first..=delta_last)
685 .await?;
686 }
687
688 if !enable_strict_consistency() {
689 let cache = self.range_cache.inner();
691 delta.retain(|key, change| match &*change {
692 Change::Insert(_) => {
693 true
696 }
697 Change::Delete => {
698 let consistent = cache.contains_key(key);
700 if !consistent {
701 consistency_error!(?key, "removing a row with non-existing key");
702 }
703 consistent
704 }
705 });
706 }
707
708 Ok(())
709 }
710
711 fn find_affected_ranges_readonly<'delta>(
720 &self,
721 part_with_delta: DeltaBTreeMap<'delta, CacheKey, OwnedRow>,
722 range_frame_logical_curr: Option<&(Sentinelled<Datum>, Sentinelled<Datum>)>,
723 ) -> std::result::Result<Vec<AffectedRange<'delta>>, (bool, bool)> {
724 if part_with_delta.first_key().is_none() {
725 return Ok(vec![]);
727 }
728
729 let delta_first_key = part_with_delta.delta().first_key_value().unwrap().0;
730 let delta_last_key = part_with_delta.delta().last_key_value().unwrap().0;
731 let cache_key_pk_len = delta_first_key.as_normal_expect().pk.len();
732
733 if part_with_delta.snapshot().is_empty() {
734 return Ok(vec![AffectedRange::new(
736 delta_first_key,
737 delta_first_key,
738 delta_last_key,
739 delta_last_key,
740 )]);
741 }
742
743 let first_key = part_with_delta.first_key().unwrap();
744 let last_key = part_with_delta.last_key().unwrap();
745
746 let first_curr_key = if self.calls.end_is_unbounded || delta_first_key == first_key {
747 first_key
750 } else {
751 let mut key = find_first_curr_for_rows_frame(
752 &self.calls.super_rows_frame_bounds,
753 part_with_delta,
754 delta_first_key,
755 );
756
757 if let Some((logical_first_curr, _)) = range_frame_logical_curr {
758 let logical_curr = logical_first_curr.as_normal_expect(); let new_key = find_left_for_range_frames(
760 &self.calls.range_frames,
761 part_with_delta,
762 logical_curr,
763 cache_key_pk_len,
764 );
765 key = std::cmp::min(key, new_key);
766 }
767
768 key
769 };
770
771 let last_curr_key = if self.calls.start_is_unbounded || delta_last_key == last_key {
772 last_key
774 } else {
775 let mut key = find_last_curr_for_rows_frame(
776 &self.calls.super_rows_frame_bounds,
777 part_with_delta,
778 delta_last_key,
779 );
780
781 if let Some((_, logical_last_curr)) = range_frame_logical_curr {
782 let logical_curr = logical_last_curr.as_normal_expect(); let new_key = find_right_for_range_frames(
784 &self.calls.range_frames,
785 part_with_delta,
786 logical_curr,
787 cache_key_pk_len,
788 );
789 key = std::cmp::max(key, new_key);
790 }
791
792 key
793 };
794
795 {
796 let mut need_extend_leftward = false;
799 let mut need_extend_rightward = false;
800 for key in [first_curr_key, last_curr_key] {
801 if key.is_smallest() {
802 need_extend_leftward = true;
803 } else if key.is_largest() {
804 need_extend_rightward = true;
805 }
806 }
807 if need_extend_leftward || need_extend_rightward {
808 return Err((need_extend_leftward, need_extend_rightward));
809 }
810 }
811
812 if first_curr_key > last_curr_key {
815 return Ok(vec![]);
822 }
823
824 let range_frame_logical_boundary = calc_logical_boundary_for_range_frames(
825 &self.calls.range_frames,
826 first_curr_key.as_normal_expect(),
827 last_curr_key.as_normal_expect(),
828 );
829
830 let first_frame_start = if self.calls.start_is_unbounded || first_curr_key == first_key {
831 first_key
834 } else {
835 let mut key = find_frame_start_for_rows_frame(
836 &self.calls.super_rows_frame_bounds,
837 part_with_delta,
838 first_curr_key,
839 );
840
841 if let Some((logical_first_start, _)) = range_frame_logical_boundary.as_ref() {
842 let logical_boundary = logical_first_start.as_normal_expect(); let new_key = find_left_for_range_frames(
844 &self.calls.range_frames,
845 part_with_delta,
846 logical_boundary,
847 cache_key_pk_len,
848 );
849 key = std::cmp::min(key, new_key);
850 }
851
852 key
853 };
854 assert!(first_frame_start <= first_curr_key);
855
856 let last_frame_end = if self.calls.end_is_unbounded || last_curr_key == last_key {
857 last_key
859 } else {
860 let mut key = find_frame_end_for_rows_frame(
861 &self.calls.super_rows_frame_bounds,
862 part_with_delta,
863 last_curr_key,
864 );
865
866 if let Some((_, logical_last_end)) = range_frame_logical_boundary.as_ref() {
867 let logical_boundary = logical_last_end.as_normal_expect(); let new_key = find_right_for_range_frames(
869 &self.calls.range_frames,
870 part_with_delta,
871 logical_boundary,
872 cache_key_pk_len,
873 );
874 key = std::cmp::max(key, new_key);
875 }
876
877 key
878 };
879 assert!(last_frame_end >= last_curr_key);
880
881 let mut need_extend_leftward = false;
882 let mut need_extend_rightward = false;
883 for key in [
884 first_curr_key,
885 last_curr_key,
886 first_frame_start,
887 last_frame_end,
888 ] {
889 if key.is_smallest() {
890 need_extend_leftward = true;
891 } else if key.is_largest() {
892 need_extend_rightward = true;
893 }
894 }
895
896 if need_extend_leftward || need_extend_rightward {
897 Err((need_extend_leftward, need_extend_rightward))
898 } else {
899 Ok(vec![AffectedRange::new(
900 first_frame_start,
901 first_curr_key,
902 last_curr_key,
903 last_frame_end,
904 )])
905 }
906 }
907
908 async fn extend_cache_to_boundary(
909 &mut self,
910 table: &StateTable<S>,
911 ) -> StreamExecutorResult<()> {
912 if self.cache_real_len() == self.range_cache.len() {
913 return Ok(());
915 }
916
917 tracing::trace!(partition=?self.deduped_part_key, "loading the whole partition into cache");
918
919 let mut new_cache = PartitionCache::new(); let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Bound::Unbounded, Bound::Unbounded);
921 let table_iter = table
922 .iter_with_prefix(self.deduped_part_key, sub_range, PrefetchOptions::default())
923 .await?;
924
925 #[for_await]
926 for row in table_iter {
927 let row: OwnedRow = row?.into_owned_row();
928 new_cache.insert(self.row_conv.row_to_state_key(&row)?.into(), row);
929 }
930 *self.range_cache = new_cache;
931
932 Ok(())
933 }
934
935 async fn extend_cache_by_range(
939 &mut self,
940 table: &StateTable<S>,
941 range: RangeInclusive<&StateKey>,
942 ) -> StreamExecutorResult<()> {
943 if self.cache_real_len() == self.range_cache.len() {
944 return Ok(());
946 }
947 assert!(self.range_cache.len() >= 2);
948
949 let cache_real_first_key = self.cache_real_first_key();
950 let cache_real_last_key = self.cache_real_last_key();
951
952 if cache_real_first_key.is_some() && *range.end() < cache_real_first_key.unwrap()
953 || cache_real_last_key.is_some() && *range.start() > cache_real_last_key.unwrap()
954 {
955 tracing::debug!(
957 partition=?self.deduped_part_key,
958 cache_first=?cache_real_first_key,
959 cache_last=?cache_real_last_key,
960 range=?range,
961 "modified range is completely non-overlapping with the cached range, re-initializing the cache"
962 );
963 *self.range_cache = new_empty_partition_cache();
964 }
965
966 if self.cache_real_len() == 0 {
967 let table_sub_range = (
969 Bound::Included(self.row_conv.state_key_to_table_sub_pk(range.start())?),
970 Bound::Included(self.row_conv.state_key_to_table_sub_pk(range.end())?),
971 );
972 tracing::debug!(
973 partition=?self.deduped_part_key,
974 table_sub_range=?table_sub_range,
975 "cache is empty, just loading the given range"
976 );
977 return self
978 .extend_cache_by_range_inner(table, table_sub_range)
979 .await;
980 }
981
982 let cache_real_first_key = self
983 .cache_real_first_key()
984 .expect("cache real len is not 0");
985 if self.cache_left_is_sentinel() && *range.start() < cache_real_first_key {
986 let table_sub_range = (
988 Bound::Included(self.row_conv.state_key_to_table_sub_pk(range.start())?),
989 Bound::Excluded(
990 self.row_conv
991 .state_key_to_table_sub_pk(cache_real_first_key)?,
992 ),
993 );
994 tracing::trace!(
995 partition=?self.deduped_part_key,
996 table_sub_range=?table_sub_range,
997 "loading the left half of given range"
998 );
999 self.extend_cache_by_range_inner(table, table_sub_range)
1000 .await?;
1001 }
1002
1003 let cache_real_last_key = self.cache_real_last_key().expect("cache real len is not 0");
1004 if self.cache_right_is_sentinel() && *range.end() > cache_real_last_key {
1005 let table_sub_range = (
1007 Bound::Excluded(
1008 self.row_conv
1009 .state_key_to_table_sub_pk(cache_real_last_key)?,
1010 ),
1011 Bound::Included(self.row_conv.state_key_to_table_sub_pk(range.end())?),
1012 );
1013 tracing::trace!(
1014 partition=?self.deduped_part_key,
1015 table_sub_range=?table_sub_range,
1016 "loading the right half of given range"
1017 );
1018 self.extend_cache_by_range_inner(table, table_sub_range)
1019 .await?;
1020 }
1021
1022 self.extend_cache_leftward_by_n(table, range.start())
1024 .await?;
1025
1026 self.extend_cache_rightward_by_n(table, range.end()).await
1028 }
1029
1030 async fn extend_cache_leftward_by_n(
1031 &mut self,
1032 table: &StateTable<S>,
1033 hint_key: &StateKey,
1034 ) -> StreamExecutorResult<()> {
1035 if self.cache_real_len() == self.range_cache.len() {
1036 return Ok(());
1038 }
1039 assert!(self.range_cache.len() >= 2);
1040
1041 let left_second = {
1042 let mut iter = self.range_cache.inner().iter();
1043 let left_first = iter.next().unwrap().0;
1044 if left_first.is_normal() {
1045 return Ok(());
1047 }
1048 iter.next().unwrap().0
1049 };
1050 let range_to_exclusive = match left_second {
1051 CacheKey::Normal(smallest_in_cache) => smallest_in_cache,
1052 CacheKey::Largest => hint_key, _ => unreachable!(),
1054 }
1055 .clone();
1056
1057 self.extend_cache_leftward_by_n_inner(table, &range_to_exclusive)
1058 .await?;
1059
1060 if self.cache_real_len() == 0 {
1061 self.extend_cache_rightward_by_n_inner(table, hint_key)
1064 .await?;
1065 if self.cache_real_len() == 0 {
1066 self.range_cache.remove(&CacheKey::Smallest);
1068 self.range_cache.remove(&CacheKey::Largest);
1069 }
1070 }
1071
1072 Ok(())
1073 }
1074
1075 async fn extend_cache_rightward_by_n(
1076 &mut self,
1077 table: &StateTable<S>,
1078 hint_key: &StateKey,
1079 ) -> StreamExecutorResult<()> {
1080 if self.cache_real_len() == self.range_cache.len() {
1081 return Ok(());
1083 }
1084 assert!(self.range_cache.len() >= 2);
1085
1086 let right_second = {
1087 let mut iter = self.range_cache.inner().iter();
1088 let right_first = iter.next_back().unwrap().0;
1089 if right_first.is_normal() {
1090 return Ok(());
1092 }
1093 iter.next_back().unwrap().0
1094 };
1095 let range_from_exclusive = match right_second {
1096 CacheKey::Normal(largest_in_cache) => largest_in_cache,
1097 CacheKey::Smallest => hint_key, _ => unreachable!(),
1099 }
1100 .clone();
1101
1102 self.extend_cache_rightward_by_n_inner(table, &range_from_exclusive)
1103 .await?;
1104
1105 if self.cache_real_len() == 0 {
1106 self.extend_cache_leftward_by_n_inner(table, hint_key)
1109 .await?;
1110 if self.cache_real_len() == 0 {
1111 self.range_cache.remove(&CacheKey::Smallest);
1113 self.range_cache.remove(&CacheKey::Largest);
1114 }
1115 }
1116
1117 Ok(())
1118 }
1119
1120 async fn extend_cache_by_range_inner(
1121 &mut self,
1122 table: &StateTable<S>,
1123 table_sub_range: (Bound<impl Row>, Bound<impl Row>),
1124 ) -> StreamExecutorResult<()> {
1125 let stream = table
1126 .iter_with_prefix(
1127 self.deduped_part_key,
1128 &table_sub_range,
1129 PrefetchOptions::default(),
1130 )
1131 .await?;
1132
1133 #[for_await]
1134 for row in stream {
1135 let row: OwnedRow = row?.into_owned_row();
1136 let key = self.row_conv.row_to_state_key(&row)?;
1137 self.range_cache.insert(CacheKey::from(key), row);
1138 }
1139
1140 Ok(())
1141 }
1142
1143 async fn extend_cache_leftward_by_n_inner(
1144 &mut self,
1145 table: &StateTable<S>,
1146 range_to_exclusive: &StateKey,
1147 ) -> StreamExecutorResult<()> {
1148 let mut n_extended = 0usize;
1149 {
1150 let sub_range = (
1151 Bound::<OwnedRow>::Unbounded,
1152 Bound::Excluded(
1153 self.row_conv
1154 .state_key_to_table_sub_pk(range_to_exclusive)?,
1155 ),
1156 );
1157 let rev_stream = table
1158 .rev_iter_with_prefix(
1159 self.deduped_part_key,
1160 &sub_range,
1161 PrefetchOptions::default(),
1162 )
1163 .await?;
1164
1165 #[for_await]
1166 for row in rev_stream {
1167 let row: OwnedRow = row?.into_owned_row();
1168
1169 let key = self.row_conv.row_to_state_key(&row)?;
1170 self.range_cache.insert(CacheKey::from(key), row);
1171
1172 n_extended += 1;
1173 if n_extended == MAGIC_BATCH_SIZE {
1174 break;
1175 }
1176 }
1177 }
1178
1179 if n_extended < MAGIC_BATCH_SIZE && self.cache_real_len() > 0 {
1180 self.range_cache.remove(&CacheKey::Smallest);
1182 }
1183
1184 Ok(())
1185 }
1186
1187 async fn extend_cache_rightward_by_n_inner(
1188 &mut self,
1189 table: &StateTable<S>,
1190 range_from_exclusive: &StateKey,
1191 ) -> StreamExecutorResult<()> {
1192 let mut n_extended = 0usize;
1193 {
1194 let sub_range = (
1195 Bound::Excluded(
1196 self.row_conv
1197 .state_key_to_table_sub_pk(range_from_exclusive)?,
1198 ),
1199 Bound::<OwnedRow>::Unbounded,
1200 );
1201 let stream = table
1202 .iter_with_prefix(
1203 self.deduped_part_key,
1204 &sub_range,
1205 PrefetchOptions::default(),
1206 )
1207 .await?;
1208
1209 #[for_await]
1210 for row in stream {
1211 let row: OwnedRow = row?.into_owned_row();
1212
1213 let key = self.row_conv.row_to_state_key(&row)?;
1214 self.range_cache.insert(CacheKey::from(key), row);
1215
1216 n_extended += 1;
1217 if n_extended == MAGIC_BATCH_SIZE {
1218 break;
1219 }
1220 }
1221 }
1222
1223 if n_extended < MAGIC_BATCH_SIZE && self.cache_real_len() > 0 {
1224 self.range_cache.remove(&CacheKey::Largest);
1226 }
1227
1228 Ok(())
1229 }
1230}