1use std::backtrace::Backtrace;
16use std::cmp::Ordering;
17use std::collections::VecDeque;
18use std::fmt::{Debug, Formatter};
19use std::ops::Bound::{Excluded, Included, Unbounded};
20use std::ops::{Bound, RangeBounds};
21use std::sync::Arc;
22use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
23use std::time::{Duration, Instant};
24
25use bytes::Bytes;
26use foyer::Hint;
27use futures::{Stream, StreamExt, pin_mut};
28use parking_lot::Mutex;
29use risingwave_common::catalog::{TableId, TableOption};
30use risingwave_common::config::StorageMemoryConfig;
31use risingwave_expr::codegen::try_stream;
32use risingwave_hummock_sdk::can_concat;
33use risingwave_hummock_sdk::compaction_group::StateTableId;
34use risingwave_hummock_sdk::key::{
35 EmptySliceRef, FullKey, TableKey, UserKey, bound_table_key_range,
36};
37use risingwave_hummock_sdk::sstable_info::SstableInfo;
38use tokio::sync::oneshot::{Receiver, Sender, channel};
39
40use super::{HummockError, HummockResult, SstableStoreRef};
41use crate::error::{StorageError, StorageResult};
42use crate::hummock::CachePolicy;
43use crate::hummock::local_version::pinned_version::PinnedVersion;
44use crate::mem_table::{KeyOp, MemTableError};
45use crate::monitor::MemoryCollector;
46use crate::store::{
47 OpConsistencyLevel, ReadOptions, StateStoreGet, StateStoreKeyedRow, StateStoreRead,
48};
49
50pub fn range_overlap<R, B>(
51 search_key_range: &R,
52 inclusive_start_key: &B,
53 end_key: Bound<&B>,
54) -> bool
55where
56 R: RangeBounds<B>,
57 B: Ord,
58{
59 let (start_bound, end_bound) = (search_key_range.start_bound(), search_key_range.end_bound());
60
61 let too_left = match (start_bound, end_key) {
64 (Included(range_start), Included(inclusive_end_key)) => range_start > inclusive_end_key,
65 (Included(range_start), Excluded(end_key))
66 | (Excluded(range_start), Included(end_key))
67 | (Excluded(range_start), Excluded(end_key)) => range_start >= end_key,
68 (Unbounded, _) | (_, Unbounded) => false,
69 };
70 let too_right = match end_bound {
73 Included(range_end) => range_end < inclusive_start_key,
74 Excluded(range_end) => range_end <= inclusive_start_key,
75 Unbounded => false,
76 };
77
78 !too_left && !too_right
79}
80
81pub fn filter_single_sst<R, B>(info: &SstableInfo, table_id: TableId, table_key_range: &R) -> bool
82where
83 R: RangeBounds<TableKey<B>>,
84 B: AsRef<[u8]> + EmptySliceRef,
85{
86 let table_range = &info.key_range;
87 let table_start = FullKey::decode(table_range.left.as_ref()).user_key;
88 let table_end = FullKey::decode(table_range.right.as_ref()).user_key;
89 let (left, right) = bound_table_key_range(table_id, table_key_range);
90 let left: Bound<UserKey<&[u8]>> = left.as_ref().map(|key| key.as_ref());
91 let right: Bound<UserKey<&[u8]>> = right.as_ref().map(|key| key.as_ref());
92 range_overlap(
93 &(left, right),
94 &table_start,
95 if table_range.right_exclusive {
96 Bound::Excluded(&table_end)
97 } else {
98 Bound::Included(&table_end)
99 },
100 ) && info.table_ids.binary_search(&table_id.table_id()).is_ok()
101}
102
103pub(crate) fn search_sst_idx(ssts: &[SstableInfo], key: UserKey<&[u8]>) -> usize {
105 ssts.partition_point(|table| {
106 let ord = FullKey::decode(&table.key_range.left).user_key.cmp(&key);
107 ord == Ordering::Less || ord == Ordering::Equal
108 })
109}
110
111pub fn prune_overlapping_ssts<'a, R, B>(
114 ssts: &'a [SstableInfo],
115 table_id: TableId,
116 table_key_range: &'a R,
117) -> impl DoubleEndedIterator<Item = &'a SstableInfo>
118where
119 R: RangeBounds<TableKey<B>>,
120 B: AsRef<[u8]> + EmptySliceRef,
121{
122 ssts.iter()
123 .filter(move |info| filter_single_sst(info, table_id, table_key_range))
124}
125
126#[allow(clippy::type_complexity)]
129pub fn prune_nonoverlapping_ssts<'a>(
130 ssts: &'a [SstableInfo],
131 user_key_range: (Bound<UserKey<&'a [u8]>>, Bound<UserKey<&'a [u8]>>),
132 table_id: StateTableId,
133) -> impl DoubleEndedIterator<Item = &'a SstableInfo> {
134 debug_assert!(can_concat(ssts));
135 let start_table_idx = match user_key_range.0 {
136 Included(key) | Excluded(key) => search_sst_idx(ssts, key).saturating_sub(1),
137 _ => 0,
138 };
139 let end_table_idx = match user_key_range.1 {
140 Included(key) | Excluded(key) => search_sst_idx(ssts, key).saturating_sub(1),
141 _ => ssts.len().saturating_sub(1),
142 };
143 ssts[start_table_idx..=end_table_idx]
144 .iter()
145 .filter(move |sst| sst.table_ids.binary_search(&table_id).is_ok())
146}
147
148type RequestQueue = VecDeque<(Sender<MemoryTracker>, u64)>;
149enum MemoryRequest {
150 Ready(MemoryTracker),
151 Pending(Receiver<MemoryTracker>),
152}
153
154struct MemoryLimiterInner {
155 total_size: AtomicU64,
156 controller: Mutex<RequestQueue>,
157 has_waiter: AtomicBool,
158 quota: u64,
159}
160
161impl MemoryLimiterInner {
162 fn release_quota(&self, quota: u64) {
163 self.total_size.fetch_sub(quota, AtomicOrdering::SeqCst);
164 }
165
166 fn add_memory(&self, quota: u64) {
167 self.total_size.fetch_add(quota, AtomicOrdering::SeqCst);
168 }
169
170 fn may_notify_waiters(self: &Arc<Self>) {
171 if !self.has_waiter.load(AtomicOrdering::Acquire) {
173 return;
174 }
175 let mut notify_waiters = vec![];
176 {
177 let mut waiters = self.controller.lock();
178 while let Some((_, quota)) = waiters.front() {
179 if !self.try_require_memory(*quota) {
180 break;
181 }
182 let (tx, quota) = waiters.pop_front().unwrap();
183 notify_waiters.push((tx, quota));
184 }
185
186 if waiters.is_empty() {
187 self.has_waiter.store(false, AtomicOrdering::Release);
188 }
189 }
190
191 for (tx, quota) in notify_waiters {
192 let _ = tx.send(MemoryTracker::new(self.clone(), quota));
193 }
194 }
195
196 fn try_require_memory(&self, quota: u64) -> bool {
197 let mut current_quota = self.total_size.load(AtomicOrdering::Acquire);
198 while self.permit_quota(current_quota, quota) {
199 match self.total_size.compare_exchange(
200 current_quota,
201 current_quota + quota,
202 AtomicOrdering::SeqCst,
203 AtomicOrdering::SeqCst,
204 ) {
205 Ok(_) => {
206 return true;
207 }
208 Err(old_quota) => {
209 current_quota = old_quota;
210 }
211 }
212 }
213 false
214 }
215
216 fn require_memory(self: &Arc<Self>, quota: u64) -> MemoryRequest {
217 let mut waiters = self.controller.lock();
218 let first_req = waiters.is_empty();
219 if first_req {
220 self.has_waiter.store(true, AtomicOrdering::Release);
222 }
223 if self.try_require_memory(quota) {
225 if first_req {
226 self.has_waiter.store(false, AtomicOrdering::Release);
227 }
228 return MemoryRequest::Ready(MemoryTracker::new(self.clone(), quota));
229 }
230 let (tx, rx) = channel();
231 waiters.push_back((tx, quota));
232 MemoryRequest::Pending(rx)
233 }
234
235 fn permit_quota(&self, current_quota: u64, _request_quota: u64) -> bool {
236 current_quota <= self.quota
237 }
238}
239
240pub struct MemoryLimiter {
241 inner: Arc<MemoryLimiterInner>,
242}
243
244impl Debug for MemoryLimiter {
245 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
246 f.debug_struct("MemoryLimiter")
247 .field("quota", &self.inner.quota)
248 .field("usage", &self.inner.total_size)
249 .finish()
250 }
251}
252
253pub struct MemoryTracker {
254 limiter: Arc<MemoryLimiterInner>,
255 quota: Option<u64>,
256}
257impl MemoryTracker {
258 fn new(limiter: Arc<MemoryLimiterInner>, quota: u64) -> Self {
259 Self {
260 limiter,
261 quota: Some(quota),
262 }
263 }
264}
265
266impl Debug for MemoryTracker {
267 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
268 f.debug_struct("MemoryTracker")
269 .field("quota", &self.quota)
270 .finish()
271 }
272}
273
274impl MemoryLimiter {
275 pub fn unlimit() -> Arc<Self> {
276 Arc::new(Self::new(u64::MAX))
277 }
278
279 pub fn new(quota: u64) -> Self {
280 Self {
281 inner: Arc::new(MemoryLimiterInner {
282 total_size: AtomicU64::new(0),
283 controller: Mutex::new(VecDeque::default()),
284 has_waiter: AtomicBool::new(false),
285 quota,
286 }),
287 }
288 }
289
290 pub fn try_require_memory(&self, quota: u64) -> Option<MemoryTracker> {
291 if self.inner.try_require_memory(quota) {
292 Some(MemoryTracker::new(self.inner.clone(), quota))
293 } else {
294 None
295 }
296 }
297
298 pub fn get_memory_usage(&self) -> u64 {
299 self.inner.total_size.load(AtomicOrdering::Acquire)
300 }
301
302 pub fn quota(&self) -> u64 {
303 self.inner.quota
304 }
305
306 pub fn must_require_memory(&self, quota: u64) -> MemoryTracker {
307 if !self.inner.try_require_memory(quota) {
308 self.inner.add_memory(quota);
309 }
310
311 MemoryTracker::new(self.inner.clone(), quota)
312 }
313}
314
315impl MemoryLimiter {
316 pub async fn require_memory(&self, quota: u64) -> MemoryTracker {
317 match self.inner.require_memory(quota) {
318 MemoryRequest::Ready(tracker) => tracker,
319 MemoryRequest::Pending(rx) => rx.await.unwrap(),
320 }
321 }
322}
323
324impl MemoryTracker {
325 pub fn try_increase_memory(&mut self, target: u64) -> bool {
326 let quota = self.quota.unwrap();
327 if quota >= target {
328 return true;
329 }
330 if self.limiter.try_require_memory(target - quota) {
331 self.quota = Some(target);
332 true
333 } else {
334 false
335 }
336 }
337}
338
339impl Drop for MemoryTracker {
341 fn drop(&mut self) {
342 if let Some(quota) = self.quota.take() {
343 self.limiter.release_quota(quota);
344 self.limiter.may_notify_waiters();
345 }
346 }
347}
348
349pub fn check_subset_preserve_order<T: Eq>(
352 sub_iter: impl Iterator<Item = T>,
353 mut full_iter: impl Iterator<Item = T>,
354) -> bool {
355 for sub_iter_item in sub_iter {
356 let mut found = false;
357 for full_iter_item in full_iter.by_ref() {
358 if sub_iter_item == full_iter_item {
359 found = true;
360 break;
361 }
362 }
363 if !found {
364 return false;
365 }
366 }
367 true
368}
369
370static SANITY_CHECK_ENABLED: AtomicBool = AtomicBool::new(cfg!(debug_assertions));
371
372pub fn disable_sanity_check() {
376 SANITY_CHECK_ENABLED.store(false, AtomicOrdering::Release);
377}
378
379pub(crate) fn sanity_check_enabled() -> bool {
380 SANITY_CHECK_ENABLED.load(AtomicOrdering::Acquire)
381}
382
383async fn get_from_state_store(
384 state_store: &impl StateStoreGet,
385 key: TableKey<Bytes>,
386 read_options: ReadOptions,
387) -> StorageResult<Option<Bytes>> {
388 state_store
389 .on_key_value(key, read_options, |_, value| {
390 Ok(Bytes::copy_from_slice(value))
391 })
392 .await
393}
394
395pub(crate) async fn do_insert_sanity_check(
397 table_id: TableId,
398 key: &TableKey<Bytes>,
399 value: &Bytes,
400 inner: &impl StateStoreRead,
401 table_option: TableOption,
402 op_consistency_level: &OpConsistencyLevel,
403) -> StorageResult<()> {
404 if let OpConsistencyLevel::Inconsistent = op_consistency_level {
405 return Ok(());
406 }
407 let read_options = ReadOptions {
408 retention_seconds: table_option.retention_seconds,
409 cache_policy: CachePolicy::Fill(Hint::Normal),
410 ..Default::default()
411 };
412 let stored_value = get_from_state_store(inner, key.clone(), read_options).await?;
413
414 if let Some(stored_value) = stored_value {
415 return Err(Box::new(MemTableError::InconsistentOperation {
416 table_id,
417 key: key.clone(),
418 prev: KeyOp::Insert(stored_value),
419 new: KeyOp::Insert(value.clone()),
420 })
421 .into());
422 }
423 Ok(())
424}
425
426pub(crate) async fn do_delete_sanity_check(
428 table_id: TableId,
429 key: &TableKey<Bytes>,
430 old_value: &Bytes,
431 inner: &impl StateStoreRead,
432 table_option: TableOption,
433 op_consistency_level: &OpConsistencyLevel,
434) -> StorageResult<()> {
435 let OpConsistencyLevel::ConsistentOldValue {
436 check_old_value: old_value_checker,
437 ..
438 } = op_consistency_level
439 else {
440 return Ok(());
441 };
442 let read_options = ReadOptions {
443 retention_seconds: table_option.retention_seconds,
444 cache_policy: CachePolicy::Fill(Hint::Normal),
445 ..Default::default()
446 };
447 match get_from_state_store(inner, key.clone(), read_options).await? {
448 None => Err(Box::new(MemTableError::InconsistentOperation {
449 table_id,
450 key: key.clone(),
451 prev: KeyOp::Delete(Bytes::default()),
452 new: KeyOp::Delete(old_value.clone()),
453 })
454 .into()),
455 Some(stored_value) => {
456 if !old_value_checker(&stored_value, old_value) {
457 Err(Box::new(MemTableError::InconsistentOperation {
458 table_id,
459 key: key.clone(),
460 prev: KeyOp::Insert(stored_value),
461 new: KeyOp::Delete(old_value.clone()),
462 })
463 .into())
464 } else {
465 Ok(())
466 }
467 }
468 }
469}
470
471pub(crate) async fn do_update_sanity_check(
473 table_id: TableId,
474 key: &TableKey<Bytes>,
475 old_value: &Bytes,
476 new_value: &Bytes,
477 inner: &impl StateStoreRead,
478 table_option: TableOption,
479 op_consistency_level: &OpConsistencyLevel,
480) -> StorageResult<()> {
481 let OpConsistencyLevel::ConsistentOldValue {
482 check_old_value: old_value_checker,
483 ..
484 } = op_consistency_level
485 else {
486 return Ok(());
487 };
488 let read_options = ReadOptions {
489 retention_seconds: table_option.retention_seconds,
490 cache_policy: CachePolicy::Fill(Hint::Normal),
491 ..Default::default()
492 };
493
494 match get_from_state_store(inner, key.clone(), read_options).await? {
495 None => Err(Box::new(MemTableError::InconsistentOperation {
496 table_id,
497 key: key.clone(),
498 prev: KeyOp::Delete(Bytes::default()),
499 new: KeyOp::Update((old_value.clone(), new_value.clone())),
500 })
501 .into()),
502 Some(stored_value) => {
503 if !old_value_checker(&stored_value, old_value) {
504 Err(Box::new(MemTableError::InconsistentOperation {
505 table_id,
506 key: key.clone(),
507 prev: KeyOp::Insert(stored_value),
508 new: KeyOp::Update((old_value.clone(), new_value.clone())),
509 })
510 .into())
511 } else {
512 Ok(())
513 }
514 }
515 }
516}
517
518pub fn cmp_delete_range_left_bounds(a: Bound<&Bytes>, b: Bound<&Bytes>) -> Ordering {
519 match (a, b) {
520 (Unbounded, _) | (_, Unbounded) => unreachable!(),
522 (Included(x), Included(y)) | (Excluded(x), Excluded(y)) => x.cmp(y),
523 (Included(x), Excluded(y)) => x.cmp(y).then(Ordering::Less),
524 (Excluded(x), Included(y)) => x.cmp(y).then(Ordering::Greater),
525 }
526}
527
528pub(crate) fn validate_delete_range(left: &Bound<Bytes>, right: &Bound<Bytes>) -> bool {
529 match (left, right) {
530 (Unbounded, _) => unreachable!(),
532 (_, Unbounded) => true,
533 (Included(x), Included(y)) => x <= y,
534 (Included(x), Excluded(y)) | (Excluded(x), Included(y)) | (Excluded(x), Excluded(y)) => {
535 x < y
536 }
537 }
538}
539
540#[expect(dead_code)]
541pub(crate) fn filter_with_delete_range<'a>(
542 kv_iter: impl Iterator<Item = (TableKey<Bytes>, KeyOp)> + 'a,
543 mut delete_ranges_iter: impl Iterator<Item = &'a (Bound<Bytes>, Bound<Bytes>)> + 'a,
544) -> impl Iterator<Item = (TableKey<Bytes>, KeyOp)> + 'a {
545 let mut range = delete_ranges_iter.next();
546 if let Some((range_start, range_end)) = range {
547 assert!(
548 validate_delete_range(range_start, range_end),
549 "range_end {:?} smaller than range_start {:?}",
550 range_start,
551 range_end
552 );
553 }
554 kv_iter.filter(move |(key, _)| {
555 if let Some(range_bound) = range {
556 if cmp_delete_range_left_bounds(Included(&key.0), range_bound.0.as_ref())
557 == Ordering::Less
558 {
559 true
560 } else if range_bound.contains(key.as_ref()) {
561 false
562 } else {
563 loop {
565 range = delete_ranges_iter.next();
566 if let Some(range_bound) = range {
567 assert!(
568 validate_delete_range(&range_bound.0, &range_bound.1),
569 "range_end {:?} smaller than range_start {:?}",
570 range_bound.0,
571 range_bound.1
572 );
573 if cmp_delete_range_left_bounds(Included(key), range_bound.0.as_ref())
574 == Ordering::Less
575 {
576 break true;
578 } else if range_bound.contains(key.as_ref()) {
579 break false;
581 } else {
582 continue;
585 }
586 } else {
587 break true;
589 }
590 }
591 }
592 } else {
593 true
594 }
595 })
596}
597
598pub(crate) async fn wait_for_epoch(
603 notifier: &tokio::sync::watch::Sender<PinnedVersion>,
604 wait_epoch: u64,
605 table_id: TableId,
606) -> StorageResult<PinnedVersion> {
607 let mut prev_committed_epoch = None;
608 let prev_committed_epoch = &mut prev_committed_epoch;
609 let version = wait_for_update(
610 notifier,
611 |version| {
612 let committed_epoch = version.table_committed_epoch(table_id);
613 let ret = if let Some(committed_epoch) = committed_epoch {
614 if committed_epoch >= wait_epoch {
615 Ok(true)
616 } else {
617 Ok(false)
618 }
619 } else if prev_committed_epoch.is_none() {
620 Ok(false)
621 } else {
622 Err(HummockError::wait_epoch(format!(
623 "table {} has been dropped",
624 table_id
625 )))
626 };
627 *prev_committed_epoch = committed_epoch;
628 ret
629 },
630 || {
631 format!(
632 "wait_for_epoch: epoch: {}, table_id: {}",
633 wait_epoch, table_id
634 )
635 },
636 )
637 .await?;
638 Ok(version)
639}
640
641pub(crate) async fn wait_for_update(
642 notifier: &tokio::sync::watch::Sender<PinnedVersion>,
643 mut inspect_fn: impl FnMut(&PinnedVersion) -> HummockResult<bool>,
644 mut periodic_debug_info: impl FnMut() -> String,
645) -> HummockResult<PinnedVersion> {
646 let mut receiver = notifier.subscribe();
647 {
648 let version = receiver.borrow_and_update();
649 if inspect_fn(&version)? {
650 return Ok(version.clone());
651 }
652 }
653 let start_time = Instant::now();
654 loop {
655 match tokio::time::timeout(Duration::from_secs(30), receiver.changed()).await {
656 Err(_) => {
657 let backtrace = cfg!(debug_assertions)
659 .then(Backtrace::capture)
660 .map(tracing::field::display);
661
662 tracing::warn!(
672 info = periodic_debug_info(),
673 elapsed = ?start_time.elapsed(),
674 backtrace,
675 "timeout when waiting for version update",
676 );
677 continue;
678 }
679 Ok(Err(_)) => {
680 return Err(HummockError::wait_epoch("tx dropped"));
681 }
682 Ok(Ok(_)) => {
683 let version = receiver.borrow_and_update();
684 if inspect_fn(&version)? {
685 return Ok(version.clone());
686 }
687 }
688 }
689 }
690}
691
692pub struct HummockMemoryCollector {
693 sstable_store: SstableStoreRef,
694 limiter: Arc<MemoryLimiter>,
695 storage_memory_config: StorageMemoryConfig,
696}
697
698impl HummockMemoryCollector {
699 pub fn new(
700 sstable_store: SstableStoreRef,
701 limiter: Arc<MemoryLimiter>,
702 storage_memory_config: StorageMemoryConfig,
703 ) -> Self {
704 Self {
705 sstable_store,
706 limiter,
707 storage_memory_config,
708 }
709 }
710}
711
712impl MemoryCollector for HummockMemoryCollector {
713 fn get_meta_memory_usage(&self) -> u64 {
714 self.sstable_store.meta_cache().memory().usage() as _
715 }
716
717 fn get_data_memory_usage(&self) -> u64 {
718 self.sstable_store.block_cache().memory().usage() as _
719 }
720
721 fn get_uploading_memory_usage(&self) -> u64 {
722 self.limiter.get_memory_usage()
723 }
724
725 fn get_prefetch_memory_usage(&self) -> usize {
726 self.sstable_store.get_prefetch_memory_usage()
727 }
728
729 fn get_meta_cache_memory_usage_ratio(&self) -> f64 {
730 self.sstable_store.meta_cache().memory().usage() as f64
731 / self.sstable_store.meta_cache().memory().capacity() as f64
732 }
733
734 fn get_block_cache_memory_usage_ratio(&self) -> f64 {
735 self.sstable_store.block_cache().memory().usage() as f64
736 / self.sstable_store.block_cache().memory().capacity() as f64
737 }
738
739 fn get_shared_buffer_usage_ratio(&self) -> f64 {
740 self.limiter.get_memory_usage() as f64
741 / (self.storage_memory_config.shared_buffer_capacity_mb * 1024 * 1024) as f64
742 }
743}
744
745#[try_stream(ok = StateStoreKeyedRow, error = StorageError)]
746pub(crate) async fn merge_stream<'a>(
747 mem_table_iter: impl Iterator<Item = (&'a TableKey<Bytes>, &'a KeyOp)> + 'a,
748 inner_stream: impl Stream<Item = StorageResult<StateStoreKeyedRow>> + 'static,
749 table_id: TableId,
750 epoch: u64,
751 rev: bool,
752) {
753 let inner_stream = inner_stream.peekable();
754 pin_mut!(inner_stream);
755
756 let mut mem_table_iter = mem_table_iter.fuse().peekable();
757
758 loop {
759 match (inner_stream.as_mut().peek().await, mem_table_iter.peek()) {
760 (None, None) => break,
761 (Some(_), None) => {
763 let (key, value) = inner_stream.next().await.unwrap()?;
764 yield (key, value)
765 }
766 (None, Some(_)) => {
768 let (key, key_op) = mem_table_iter.next().unwrap();
769 match key_op {
770 KeyOp::Insert(value) | KeyOp::Update((_, value)) => {
771 yield (FullKey::new(table_id, key.clone(), epoch), value.clone())
772 }
773 _ => {}
774 }
775 }
776 (Some(Ok((inner_key, _))), Some((mem_table_key, _))) => {
777 debug_assert_eq!(inner_key.user_key.table_id, table_id);
778 let mut ret = inner_key.user_key.table_key.cmp(mem_table_key);
779 if rev {
780 ret = ret.reverse();
781 }
782 match ret {
783 Ordering::Less => {
784 let (key, value) = inner_stream.next().await.unwrap()?;
786 yield (key, value);
787 }
788 Ordering::Equal => {
789 let (_, key_op) = mem_table_iter.next().unwrap();
793 let (key, old_value_in_inner) = inner_stream.next().await.unwrap()?;
794 match key_op {
795 KeyOp::Insert(value) => {
796 yield (key.clone(), value.clone());
797 }
798 KeyOp::Delete(_) => {}
799 KeyOp::Update((old_value, new_value)) => {
800 debug_assert!(old_value == &old_value_in_inner);
801
802 yield (key, new_value.clone());
803 }
804 }
805 }
806 Ordering::Greater => {
807 let (key, key_op) = mem_table_iter.next().unwrap();
809
810 match key_op {
811 KeyOp::Insert(value) => {
812 yield (FullKey::new(table_id, key.clone(), epoch), value.clone());
813 }
814 KeyOp::Delete(_) => {}
815 KeyOp::Update(_) => unreachable!(
816 "memtable update should always be paired with a storage key"
817 ),
818 }
819 }
820 }
821 }
822 (Some(Err(_)), Some(_)) => {
823 return Err(inner_stream.next().await.unwrap().unwrap_err());
825 }
826 }
827 }
828}
829
830#[cfg(test)]
831mod tests {
832 use std::future::{Future, poll_fn};
833 use std::sync::Arc;
834 use std::task::Poll;
835
836 use futures::FutureExt;
837 use futures::future::join_all;
838 use rand::random_range;
839
840 use crate::hummock::utils::MemoryLimiter;
841
842 async fn assert_pending(future: &mut (impl Future + Unpin)) {
843 for _ in 0..10 {
844 assert!(
845 poll_fn(|cx| Poll::Ready(future.poll_unpin(cx)))
846 .await
847 .is_pending()
848 );
849 }
850 }
851
852 #[tokio::test]
853 async fn test_loose_memory_limiter() {
854 let quota = 5;
855 let memory_limiter = MemoryLimiter::new(quota);
856 drop(memory_limiter.require_memory(6).await);
857 let tracker1 = memory_limiter.require_memory(3).await;
858 assert_eq!(3, memory_limiter.get_memory_usage());
859 let tracker2 = memory_limiter.require_memory(4).await;
860 assert_eq!(7, memory_limiter.get_memory_usage());
861 let mut future = memory_limiter.require_memory(5).boxed();
862 assert_pending(&mut future).await;
863 assert_eq!(7, memory_limiter.get_memory_usage());
864 drop(tracker1);
865 let tracker3 = future.await;
866 assert_eq!(9, memory_limiter.get_memory_usage());
867 drop(tracker2);
868 assert_eq!(5, memory_limiter.get_memory_usage());
869 drop(tracker3);
870 assert_eq!(0, memory_limiter.get_memory_usage());
871 }
872
873 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
874 async fn test_multi_thread_acquire_memory() {
875 const QUOTA: u64 = 10;
876 let memory_limiter = Arc::new(MemoryLimiter::new(200));
877 let mut handles = vec![];
878 for _ in 0..40 {
879 let limiter = memory_limiter.clone();
880 let h = tokio::spawn(async move {
881 let mut buffers = vec![];
882 let mut current_buffer_usage = random_range(2..=9);
883 for _ in 0..1000 {
884 if buffers.len() < current_buffer_usage
885 && let Some(tracker) = limiter.try_require_memory(QUOTA)
886 {
887 buffers.push(tracker);
888 } else {
889 buffers.clear();
890 current_buffer_usage = random_range(2..=9);
891 let req = limiter.require_memory(QUOTA);
892 match tokio::time::timeout(std::time::Duration::from_millis(1), req).await {
893 Ok(tracker) => {
894 buffers.push(tracker);
895 }
896 Err(_) => {
897 continue;
898 }
899 }
900 }
901 let sleep_time = random_range(1..=3);
902 tokio::time::sleep(std::time::Duration::from_millis(sleep_time)).await;
903 }
904 });
905 handles.push(h);
906 }
907 let h = join_all(handles);
908 let _ = h.await;
909 }
910}