1use std::backtrace::Backtrace;
16use std::cmp::Ordering;
17use std::collections::VecDeque;
18use std::fmt::{Debug, Formatter};
19use std::ops::Bound::{Excluded, Included, Unbounded};
20use std::ops::{Bound, RangeBounds};
21use std::sync::Arc;
22use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
23use std::time::{Duration, Instant};
24
25use bytes::Bytes;
26use foyer::CacheHint;
27use futures::{Stream, StreamExt, pin_mut};
28use parking_lot::Mutex;
29use risingwave_common::catalog::{TableId, TableOption};
30use risingwave_common::config::StorageMemoryConfig;
31use risingwave_expr::codegen::try_stream;
32use risingwave_hummock_sdk::can_concat;
33use risingwave_hummock_sdk::compaction_group::StateTableId;
34use risingwave_hummock_sdk::key::{
35 EmptySliceRef, FullKey, TableKey, UserKey, bound_table_key_range,
36};
37use risingwave_hummock_sdk::sstable_info::SstableInfo;
38use tokio::sync::oneshot::{Receiver, Sender, channel};
39
40use super::{HummockError, HummockResult, SstableStoreRef};
41use crate::error::{StorageError, StorageResult};
42use crate::hummock::CachePolicy;
43use crate::hummock::local_version::pinned_version::PinnedVersion;
44use crate::mem_table::{KeyOp, MemTableError};
45use crate::monitor::MemoryCollector;
46use crate::store::{OpConsistencyLevel, ReadOptions, StateStoreKeyedRow, StateStoreRead};
47
48pub fn range_overlap<R, B>(
49 search_key_range: &R,
50 inclusive_start_key: &B,
51 end_key: Bound<&B>,
52) -> bool
53where
54 R: RangeBounds<B>,
55 B: Ord,
56{
57 let (start_bound, end_bound) = (search_key_range.start_bound(), search_key_range.end_bound());
58
59 let too_left = match (start_bound, end_key) {
62 (Included(range_start), Included(inclusive_end_key)) => range_start > inclusive_end_key,
63 (Included(range_start), Excluded(end_key))
64 | (Excluded(range_start), Included(end_key))
65 | (Excluded(range_start), Excluded(end_key)) => range_start >= end_key,
66 (Unbounded, _) | (_, Unbounded) => false,
67 };
68 let too_right = match end_bound {
71 Included(range_end) => range_end < inclusive_start_key,
72 Excluded(range_end) => range_end <= inclusive_start_key,
73 Unbounded => false,
74 };
75
76 !too_left && !too_right
77}
78
79pub fn filter_single_sst<R, B>(info: &SstableInfo, table_id: TableId, table_key_range: &R) -> bool
80where
81 R: RangeBounds<TableKey<B>>,
82 B: AsRef<[u8]> + EmptySliceRef,
83{
84 let table_range = &info.key_range;
85 let table_start = FullKey::decode(table_range.left.as_ref()).user_key;
86 let table_end = FullKey::decode(table_range.right.as_ref()).user_key;
87 let (left, right) = bound_table_key_range(table_id, table_key_range);
88 let left: Bound<UserKey<&[u8]>> = left.as_ref().map(|key| key.as_ref());
89 let right: Bound<UserKey<&[u8]>> = right.as_ref().map(|key| key.as_ref());
90 range_overlap(
91 &(left, right),
92 &table_start,
93 if table_range.right_exclusive {
94 Bound::Excluded(&table_end)
95 } else {
96 Bound::Included(&table_end)
97 },
98 ) && info.table_ids.binary_search(&table_id.table_id()).is_ok()
99}
100
101pub(crate) fn search_sst_idx(ssts: &[SstableInfo], key: UserKey<&[u8]>) -> usize {
103 ssts.partition_point(|table| {
104 let ord = FullKey::decode(&table.key_range.left).user_key.cmp(&key);
105 ord == Ordering::Less || ord == Ordering::Equal
106 })
107}
108
109pub fn prune_overlapping_ssts<'a, R, B>(
112 ssts: &'a [SstableInfo],
113 table_id: TableId,
114 table_key_range: &'a R,
115) -> impl DoubleEndedIterator<Item = &'a SstableInfo>
116where
117 R: RangeBounds<TableKey<B>>,
118 B: AsRef<[u8]> + EmptySliceRef,
119{
120 ssts.iter()
121 .filter(move |info| filter_single_sst(info, table_id, table_key_range))
122}
123
124#[allow(clippy::type_complexity)]
127pub fn prune_nonoverlapping_ssts<'a>(
128 ssts: &'a [SstableInfo],
129 user_key_range: (Bound<UserKey<&'a [u8]>>, Bound<UserKey<&'a [u8]>>),
130 table_id: StateTableId,
131) -> impl DoubleEndedIterator<Item = &'a SstableInfo> {
132 debug_assert!(can_concat(ssts));
133 let start_table_idx = match user_key_range.0 {
134 Included(key) | Excluded(key) => search_sst_idx(ssts, key).saturating_sub(1),
135 _ => 0,
136 };
137 let end_table_idx = match user_key_range.1 {
138 Included(key) | Excluded(key) => search_sst_idx(ssts, key).saturating_sub(1),
139 _ => ssts.len().saturating_sub(1),
140 };
141 ssts[start_table_idx..=end_table_idx]
142 .iter()
143 .filter(move |sst| sst.table_ids.binary_search(&table_id).is_ok())
144}
145
146type RequestQueue = VecDeque<(Sender<MemoryTracker>, u64)>;
147enum MemoryRequest {
148 Ready(MemoryTracker),
149 Pending(Receiver<MemoryTracker>),
150}
151
152struct MemoryLimiterInner {
153 total_size: AtomicU64,
154 controller: Mutex<RequestQueue>,
155 has_waiter: AtomicBool,
156 quota: u64,
157}
158
159impl MemoryLimiterInner {
160 fn release_quota(&self, quota: u64) {
161 self.total_size.fetch_sub(quota, AtomicOrdering::SeqCst);
162 }
163
164 fn add_memory(&self, quota: u64) {
165 self.total_size.fetch_add(quota, AtomicOrdering::SeqCst);
166 }
167
168 fn may_notify_waiters(self: &Arc<Self>) {
169 if !self.has_waiter.load(AtomicOrdering::Acquire) {
171 return;
172 }
173 let mut notify_waiters = vec![];
174 {
175 let mut waiters = self.controller.lock();
176 while let Some((_, quota)) = waiters.front() {
177 if !self.try_require_memory(*quota) {
178 break;
179 }
180 let (tx, quota) = waiters.pop_front().unwrap();
181 notify_waiters.push((tx, quota));
182 }
183
184 if waiters.is_empty() {
185 self.has_waiter.store(false, AtomicOrdering::Release);
186 }
187 }
188
189 for (tx, quota) in notify_waiters {
190 let _ = tx.send(MemoryTracker::new(self.clone(), quota));
191 }
192 }
193
194 fn try_require_memory(&self, quota: u64) -> bool {
195 let mut current_quota = self.total_size.load(AtomicOrdering::Acquire);
196 while self.permit_quota(current_quota, quota) {
197 match self.total_size.compare_exchange(
198 current_quota,
199 current_quota + quota,
200 AtomicOrdering::SeqCst,
201 AtomicOrdering::SeqCst,
202 ) {
203 Ok(_) => {
204 return true;
205 }
206 Err(old_quota) => {
207 current_quota = old_quota;
208 }
209 }
210 }
211 false
212 }
213
214 fn require_memory(self: &Arc<Self>, quota: u64) -> MemoryRequest {
215 let mut waiters = self.controller.lock();
216 let first_req = waiters.is_empty();
217 if first_req {
218 self.has_waiter.store(true, AtomicOrdering::Release);
220 }
221 if self.try_require_memory(quota) {
223 if first_req {
224 self.has_waiter.store(false, AtomicOrdering::Release);
225 }
226 return MemoryRequest::Ready(MemoryTracker::new(self.clone(), quota));
227 }
228 let (tx, rx) = channel();
229 waiters.push_back((tx, quota));
230 MemoryRequest::Pending(rx)
231 }
232
233 fn permit_quota(&self, current_quota: u64, _request_quota: u64) -> bool {
234 current_quota <= self.quota
235 }
236}
237
238pub struct MemoryLimiter {
239 inner: Arc<MemoryLimiterInner>,
240}
241
242impl Debug for MemoryLimiter {
243 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
244 f.debug_struct("MemoryLimiter")
245 .field("quota", &self.inner.quota)
246 .field("usage", &self.inner.total_size)
247 .finish()
248 }
249}
250
251pub struct MemoryTracker {
252 limiter: Arc<MemoryLimiterInner>,
253 quota: Option<u64>,
254}
255impl MemoryTracker {
256 fn new(limiter: Arc<MemoryLimiterInner>, quota: u64) -> Self {
257 Self {
258 limiter,
259 quota: Some(quota),
260 }
261 }
262}
263
264impl Debug for MemoryTracker {
265 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
266 f.debug_struct("MemoryTracker")
267 .field("quota", &self.quota)
268 .finish()
269 }
270}
271
272impl MemoryLimiter {
273 pub fn unlimit() -> Arc<Self> {
274 Arc::new(Self::new(u64::MAX))
275 }
276
277 pub fn new(quota: u64) -> Self {
278 Self {
279 inner: Arc::new(MemoryLimiterInner {
280 total_size: AtomicU64::new(0),
281 controller: Mutex::new(VecDeque::default()),
282 has_waiter: AtomicBool::new(false),
283 quota,
284 }),
285 }
286 }
287
288 pub fn try_require_memory(&self, quota: u64) -> Option<MemoryTracker> {
289 if self.inner.try_require_memory(quota) {
290 Some(MemoryTracker::new(self.inner.clone(), quota))
291 } else {
292 None
293 }
294 }
295
296 pub fn get_memory_usage(&self) -> u64 {
297 self.inner.total_size.load(AtomicOrdering::Acquire)
298 }
299
300 pub fn quota(&self) -> u64 {
301 self.inner.quota
302 }
303
304 pub fn must_require_memory(&self, quota: u64) -> MemoryTracker {
305 if !self.inner.try_require_memory(quota) {
306 self.inner.add_memory(quota);
307 }
308
309 MemoryTracker::new(self.inner.clone(), quota)
310 }
311}
312
313impl MemoryLimiter {
314 pub async fn require_memory(&self, quota: u64) -> MemoryTracker {
315 match self.inner.require_memory(quota) {
316 MemoryRequest::Ready(tracker) => tracker,
317 MemoryRequest::Pending(rx) => rx.await.unwrap(),
318 }
319 }
320}
321
322impl MemoryTracker {
323 pub fn try_increase_memory(&mut self, target: u64) -> bool {
324 let quota = self.quota.unwrap();
325 if quota >= target {
326 return true;
327 }
328 if self.limiter.try_require_memory(target - quota) {
329 self.quota = Some(target);
330 true
331 } else {
332 false
333 }
334 }
335}
336
337impl Drop for MemoryTracker {
339 fn drop(&mut self) {
340 if let Some(quota) = self.quota.take() {
341 self.limiter.release_quota(quota);
342 self.limiter.may_notify_waiters();
343 }
344 }
345}
346
347pub fn check_subset_preserve_order<T: Eq>(
350 sub_iter: impl Iterator<Item = T>,
351 mut full_iter: impl Iterator<Item = T>,
352) -> bool {
353 for sub_iter_item in sub_iter {
354 let mut found = false;
355 for full_iter_item in full_iter.by_ref() {
356 if sub_iter_item == full_iter_item {
357 found = true;
358 break;
359 }
360 }
361 if !found {
362 return false;
363 }
364 }
365 true
366}
367
368static SANITY_CHECK_ENABLED: AtomicBool = AtomicBool::new(cfg!(debug_assertions));
369
370pub fn disable_sanity_check() {
374 SANITY_CHECK_ENABLED.store(false, AtomicOrdering::Release);
375}
376
377pub(crate) fn sanity_check_enabled() -> bool {
378 SANITY_CHECK_ENABLED.load(AtomicOrdering::Acquire)
379}
380
381pub(crate) async fn do_insert_sanity_check(
383 key: &TableKey<Bytes>,
384 value: &Bytes,
385 inner: &impl StateStoreRead,
386 table_id: TableId,
387 table_option: TableOption,
388 op_consistency_level: &OpConsistencyLevel,
389) -> StorageResult<()> {
390 if let OpConsistencyLevel::Inconsistent = op_consistency_level {
391 return Ok(());
392 }
393 let read_options = ReadOptions {
394 retention_seconds: table_option.retention_seconds,
395 table_id,
396 cache_policy: CachePolicy::Fill(CacheHint::Normal),
397 ..Default::default()
398 };
399 let stored_value = inner.get(key.clone(), read_options).await?;
400
401 if let Some(stored_value) = stored_value {
402 return Err(Box::new(MemTableError::InconsistentOperation {
403 key: key.clone(),
404 prev: KeyOp::Insert(stored_value),
405 new: KeyOp::Insert(value.clone()),
406 })
407 .into());
408 }
409 Ok(())
410}
411
412pub(crate) async fn do_delete_sanity_check(
414 key: &TableKey<Bytes>,
415 old_value: &Bytes,
416 inner: &impl StateStoreRead,
417 table_id: TableId,
418 table_option: TableOption,
419 op_consistency_level: &OpConsistencyLevel,
420) -> StorageResult<()> {
421 let OpConsistencyLevel::ConsistentOldValue {
422 check_old_value: old_value_checker,
423 ..
424 } = op_consistency_level
425 else {
426 return Ok(());
427 };
428 let read_options = ReadOptions {
429 retention_seconds: table_option.retention_seconds,
430 table_id,
431 cache_policy: CachePolicy::Fill(CacheHint::Normal),
432 ..Default::default()
433 };
434 match inner.get(key.clone(), read_options).await? {
435 None => Err(Box::new(MemTableError::InconsistentOperation {
436 key: key.clone(),
437 prev: KeyOp::Delete(Bytes::default()),
438 new: KeyOp::Delete(old_value.clone()),
439 })
440 .into()),
441 Some(stored_value) => {
442 if !old_value_checker(&stored_value, old_value) {
443 Err(Box::new(MemTableError::InconsistentOperation {
444 key: key.clone(),
445 prev: KeyOp::Insert(stored_value),
446 new: KeyOp::Delete(old_value.clone()),
447 })
448 .into())
449 } else {
450 Ok(())
451 }
452 }
453 }
454}
455
456pub(crate) async fn do_update_sanity_check(
458 key: &TableKey<Bytes>,
459 old_value: &Bytes,
460 new_value: &Bytes,
461 inner: &impl StateStoreRead,
462 table_id: TableId,
463 table_option: TableOption,
464 op_consistency_level: &OpConsistencyLevel,
465) -> StorageResult<()> {
466 let OpConsistencyLevel::ConsistentOldValue {
467 check_old_value: old_value_checker,
468 ..
469 } = op_consistency_level
470 else {
471 return Ok(());
472 };
473 let read_options = ReadOptions {
474 retention_seconds: table_option.retention_seconds,
475 table_id,
476 cache_policy: CachePolicy::Fill(CacheHint::Normal),
477 ..Default::default()
478 };
479
480 match inner.get(key.clone(), read_options).await? {
481 None => Err(Box::new(MemTableError::InconsistentOperation {
482 key: key.clone(),
483 prev: KeyOp::Delete(Bytes::default()),
484 new: KeyOp::Update((old_value.clone(), new_value.clone())),
485 })
486 .into()),
487 Some(stored_value) => {
488 if !old_value_checker(&stored_value, old_value) {
489 Err(Box::new(MemTableError::InconsistentOperation {
490 key: key.clone(),
491 prev: KeyOp::Insert(stored_value),
492 new: KeyOp::Update((old_value.clone(), new_value.clone())),
493 })
494 .into())
495 } else {
496 Ok(())
497 }
498 }
499 }
500}
501
502pub fn cmp_delete_range_left_bounds(a: Bound<&Bytes>, b: Bound<&Bytes>) -> Ordering {
503 match (a, b) {
504 (Unbounded, _) | (_, Unbounded) => unreachable!(),
506 (Included(x), Included(y)) | (Excluded(x), Excluded(y)) => x.cmp(y),
507 (Included(x), Excluded(y)) => x.cmp(y).then(Ordering::Less),
508 (Excluded(x), Included(y)) => x.cmp(y).then(Ordering::Greater),
509 }
510}
511
512pub(crate) fn validate_delete_range(left: &Bound<Bytes>, right: &Bound<Bytes>) -> bool {
513 match (left, right) {
514 (Unbounded, _) => unreachable!(),
516 (_, Unbounded) => true,
517 (Included(x), Included(y)) => x <= y,
518 (Included(x), Excluded(y)) | (Excluded(x), Included(y)) | (Excluded(x), Excluded(y)) => {
519 x < y
520 }
521 }
522}
523
524#[expect(dead_code)]
525pub(crate) fn filter_with_delete_range<'a>(
526 kv_iter: impl Iterator<Item = (TableKey<Bytes>, KeyOp)> + 'a,
527 mut delete_ranges_iter: impl Iterator<Item = &'a (Bound<Bytes>, Bound<Bytes>)> + 'a,
528) -> impl Iterator<Item = (TableKey<Bytes>, KeyOp)> + 'a {
529 let mut range = delete_ranges_iter.next();
530 if let Some((range_start, range_end)) = range {
531 assert!(
532 validate_delete_range(range_start, range_end),
533 "range_end {:?} smaller than range_start {:?}",
534 range_start,
535 range_end
536 );
537 }
538 kv_iter.filter(move |(key, _)| {
539 if let Some(range_bound) = range {
540 if cmp_delete_range_left_bounds(Included(&key.0), range_bound.0.as_ref())
541 == Ordering::Less
542 {
543 true
544 } else if range_bound.contains(key.as_ref()) {
545 false
546 } else {
547 loop {
549 range = delete_ranges_iter.next();
550 if let Some(range_bound) = range {
551 assert!(
552 validate_delete_range(&range_bound.0, &range_bound.1),
553 "range_end {:?} smaller than range_start {:?}",
554 range_bound.0,
555 range_bound.1
556 );
557 if cmp_delete_range_left_bounds(Included(key), range_bound.0.as_ref())
558 == Ordering::Less
559 {
560 break true;
562 } else if range_bound.contains(key.as_ref()) {
563 break false;
565 } else {
566 continue;
569 }
570 } else {
571 break true;
573 }
574 }
575 }
576 } else {
577 true
578 }
579 })
580}
581
582pub(crate) async fn wait_for_epoch(
587 notifier: &tokio::sync::watch::Sender<PinnedVersion>,
588 wait_epoch: u64,
589 table_id: TableId,
590) -> StorageResult<()> {
591 let mut prev_committed_epoch = None;
592 let prev_committed_epoch = &mut prev_committed_epoch;
593 wait_for_update(
594 notifier,
595 |version| {
596 let committed_epoch = version.table_committed_epoch(table_id);
597 let ret = if let Some(committed_epoch) = committed_epoch {
598 if committed_epoch >= wait_epoch {
599 Ok(true)
600 } else {
601 Ok(false)
602 }
603 } else if prev_committed_epoch.is_none() {
604 Ok(false)
605 } else {
606 Err(HummockError::wait_epoch(format!(
607 "table {} has been dropped",
608 table_id
609 )))
610 };
611 *prev_committed_epoch = committed_epoch;
612 ret
613 },
614 || {
615 format!(
616 "wait_for_epoch: epoch: {}, table_id: {}",
617 wait_epoch, table_id
618 )
619 },
620 )
621 .await?;
622 Ok(())
623}
624
625pub(crate) async fn wait_for_update(
626 notifier: &tokio::sync::watch::Sender<PinnedVersion>,
627 mut inspect_fn: impl FnMut(&PinnedVersion) -> HummockResult<bool>,
628 mut periodic_debug_info: impl FnMut() -> String,
629) -> HummockResult<()> {
630 let mut receiver = notifier.subscribe();
631 if inspect_fn(&receiver.borrow_and_update())? {
632 return Ok(());
633 }
634 let start_time = Instant::now();
635 loop {
636 match tokio::time::timeout(Duration::from_secs(30), receiver.changed()).await {
637 Err(_) => {
638 let backtrace = cfg!(debug_assertions)
640 .then(Backtrace::capture)
641 .map(tracing::field::display);
642
643 tracing::warn!(
653 info = periodic_debug_info(),
654 elapsed = ?start_time.elapsed(),
655 backtrace,
656 "timeout when waiting for version update",
657 );
658 continue;
659 }
660 Ok(Err(_)) => {
661 return Err(HummockError::wait_epoch("tx dropped"));
662 }
663 Ok(Ok(_)) => {
664 if inspect_fn(&receiver.borrow_and_update())? {
665 return Ok(());
666 }
667 }
668 }
669 }
670}
671
672pub struct HummockMemoryCollector {
673 sstable_store: SstableStoreRef,
674 limiter: Arc<MemoryLimiter>,
675 storage_memory_config: StorageMemoryConfig,
676}
677
678impl HummockMemoryCollector {
679 pub fn new(
680 sstable_store: SstableStoreRef,
681 limiter: Arc<MemoryLimiter>,
682 storage_memory_config: StorageMemoryConfig,
683 ) -> Self {
684 Self {
685 sstable_store,
686 limiter,
687 storage_memory_config,
688 }
689 }
690}
691
692impl MemoryCollector for HummockMemoryCollector {
693 fn get_meta_memory_usage(&self) -> u64 {
694 self.sstable_store.meta_cache().memory().usage() as _
695 }
696
697 fn get_data_memory_usage(&self) -> u64 {
698 self.sstable_store.block_cache().memory().usage() as _
699 }
700
701 fn get_uploading_memory_usage(&self) -> u64 {
702 self.limiter.get_memory_usage()
703 }
704
705 fn get_prefetch_memory_usage(&self) -> usize {
706 self.sstable_store.get_prefetch_memory_usage()
707 }
708
709 fn get_meta_cache_memory_usage_ratio(&self) -> f64 {
710 self.sstable_store.meta_cache().memory().usage() as f64
711 / self.sstable_store.meta_cache().memory().capacity() as f64
712 }
713
714 fn get_block_cache_memory_usage_ratio(&self) -> f64 {
715 self.sstable_store.block_cache().memory().usage() as f64
716 / self.sstable_store.block_cache().memory().capacity() as f64
717 }
718
719 fn get_shared_buffer_usage_ratio(&self) -> f64 {
720 self.limiter.get_memory_usage() as f64
721 / (self.storage_memory_config.shared_buffer_capacity_mb * 1024 * 1024) as f64
722 }
723}
724
725#[try_stream(ok = StateStoreKeyedRow, error = StorageError)]
726pub(crate) async fn merge_stream<'a>(
727 mem_table_iter: impl Iterator<Item = (&'a TableKey<Bytes>, &'a KeyOp)> + 'a,
728 inner_stream: impl Stream<Item = StorageResult<StateStoreKeyedRow>> + 'static,
729 table_id: TableId,
730 epoch: u64,
731 rev: bool,
732) {
733 let inner_stream = inner_stream.peekable();
734 pin_mut!(inner_stream);
735
736 let mut mem_table_iter = mem_table_iter.fuse().peekable();
737
738 loop {
739 match (inner_stream.as_mut().peek().await, mem_table_iter.peek()) {
740 (None, None) => break,
741 (Some(_), None) => {
743 let (key, value) = inner_stream.next().await.unwrap()?;
744 yield (key, value)
745 }
746 (None, Some(_)) => {
748 let (key, key_op) = mem_table_iter.next().unwrap();
749 match key_op {
750 KeyOp::Insert(value) | KeyOp::Update((_, value)) => {
751 yield (FullKey::new(table_id, key.clone(), epoch), value.clone())
752 }
753 _ => {}
754 }
755 }
756 (Some(Ok((inner_key, _))), Some((mem_table_key, _))) => {
757 debug_assert_eq!(inner_key.user_key.table_id, table_id);
758 let mut ret = inner_key.user_key.table_key.cmp(mem_table_key);
759 if rev {
760 ret = ret.reverse();
761 }
762 match ret {
763 Ordering::Less => {
764 let (key, value) = inner_stream.next().await.unwrap()?;
766 yield (key, value);
767 }
768 Ordering::Equal => {
769 let (_, key_op) = mem_table_iter.next().unwrap();
773 let (key, old_value_in_inner) = inner_stream.next().await.unwrap()?;
774 match key_op {
775 KeyOp::Insert(value) => {
776 yield (key.clone(), value.clone());
777 }
778 KeyOp::Delete(_) => {}
779 KeyOp::Update((old_value, new_value)) => {
780 debug_assert!(old_value == &old_value_in_inner);
781
782 yield (key, new_value.clone());
783 }
784 }
785 }
786 Ordering::Greater => {
787 let (key, key_op) = mem_table_iter.next().unwrap();
789
790 match key_op {
791 KeyOp::Insert(value) => {
792 yield (FullKey::new(table_id, key.clone(), epoch), value.clone());
793 }
794 KeyOp::Delete(_) => {}
795 KeyOp::Update(_) => unreachable!(
796 "memtable update should always be paired with a storage key"
797 ),
798 }
799 }
800 }
801 }
802 (Some(Err(_)), Some(_)) => {
803 return Err(inner_stream.next().await.unwrap().unwrap_err());
805 }
806 }
807 }
808}
809
810#[cfg(test)]
811mod tests {
812 use std::future::{Future, poll_fn};
813 use std::sync::Arc;
814 use std::task::Poll;
815
816 use futures::FutureExt;
817 use futures::future::join_all;
818 use rand::random_range;
819
820 use crate::hummock::utils::MemoryLimiter;
821
822 async fn assert_pending(future: &mut (impl Future + Unpin)) {
823 for _ in 0..10 {
824 assert!(
825 poll_fn(|cx| Poll::Ready(future.poll_unpin(cx)))
826 .await
827 .is_pending()
828 );
829 }
830 }
831
832 #[tokio::test]
833 async fn test_loose_memory_limiter() {
834 let quota = 5;
835 let memory_limiter = MemoryLimiter::new(quota);
836 drop(memory_limiter.require_memory(6).await);
837 let tracker1 = memory_limiter.require_memory(3).await;
838 assert_eq!(3, memory_limiter.get_memory_usage());
839 let tracker2 = memory_limiter.require_memory(4).await;
840 assert_eq!(7, memory_limiter.get_memory_usage());
841 let mut future = memory_limiter.require_memory(5).boxed();
842 assert_pending(&mut future).await;
843 assert_eq!(7, memory_limiter.get_memory_usage());
844 drop(tracker1);
845 let tracker3 = future.await;
846 assert_eq!(9, memory_limiter.get_memory_usage());
847 drop(tracker2);
848 assert_eq!(5, memory_limiter.get_memory_usage());
849 drop(tracker3);
850 assert_eq!(0, memory_limiter.get_memory_usage());
851 }
852
853 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
854 async fn test_multi_thread_acquire_memory() {
855 const QUOTA: u64 = 10;
856 let memory_limiter = Arc::new(MemoryLimiter::new(200));
857 let mut handles = vec![];
858 for _ in 0..40 {
859 let limiter = memory_limiter.clone();
860 let h = tokio::spawn(async move {
861 let mut buffers = vec![];
862 let mut current_buffer_usage = random_range(2..=9);
863 for _ in 0..1000 {
864 if buffers.len() < current_buffer_usage
865 && let Some(tracker) = limiter.try_require_memory(QUOTA)
866 {
867 buffers.push(tracker);
868 } else {
869 buffers.clear();
870 current_buffer_usage = random_range(2..=9);
871 let req = limiter.require_memory(QUOTA);
872 match tokio::time::timeout(std::time::Duration::from_millis(1), req).await {
873 Ok(tracker) => {
874 buffers.push(tracker);
875 }
876 Err(_) => {
877 continue;
878 }
879 }
880 }
881 let sleep_time = random_range(1..=3);
882 tokio::time::sleep(std::time::Duration::from_millis(sleep_time)).await;
883 }
884 });
885 handles.push(h);
886 }
887 let h = join_all(handles);
888 let _ = h.await;
889 }
890}