1use std::cmp::Ordering;
16use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
17use std::ops::Bound::{Excluded, Included, Unbounded};
18use std::ops::{Bound, RangeBounds};
19use std::sync::{Arc, LazyLock};
20
21use bytes::Bytes;
22use itertools::Itertools;
23use parking_lot::RwLock;
24use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
25use risingwave_common::catalog::{TableId, TableOption};
26use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
27use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH};
28use risingwave_hummock_sdk::key::{
29 FullKey, TableKey, TableKeyRange, UserKey, prefixed_range_with_vnode,
30};
31use risingwave_hummock_sdk::table_watermark::WatermarkDirection;
32use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
33use thiserror_ext::AsReport;
34use tokio::task::yield_now;
35use tracing::error;
36
37use crate::error::StorageResult;
38use crate::hummock::HummockError;
39use crate::hummock::utils::{
40 do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, merge_stream,
41 sanity_check_enabled,
42};
43use crate::mem_table::{KeyOp, MemTable};
44use crate::storage_value::StorageValue;
45use crate::store::*;
46
47pub type BytesFullKey = FullKey<Bytes>;
48pub type BytesFullKeyRange = (Bound<BytesFullKey>, Bound<BytesFullKey>);
49
50#[allow(clippy::type_complexity)]
51pub trait RangeKv: Clone + Send + Sync + 'static {
52 fn range(
53 &self,
54 range: BytesFullKeyRange,
55 limit: Option<usize>,
56 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>>;
57
58 fn rev_range(
59 &self,
60 range: BytesFullKeyRange,
61 limit: Option<usize>,
62 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>>;
63
64 fn ingest_batch(
65 &self,
66 kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
67 ) -> StorageResult<()>;
68
69 fn flush(&self) -> StorageResult<()>;
70}
71
72pub type BTreeMapRangeKv = Arc<RwLock<BTreeMap<BytesFullKey, Option<Bytes>>>>;
73
74impl RangeKv for BTreeMapRangeKv {
75 fn range(
76 &self,
77 range: BytesFullKeyRange,
78 limit: Option<usize>,
79 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
80 let limit = limit.unwrap_or(usize::MAX);
81 Ok(self
82 .read()
83 .range(range)
84 .take(limit)
85 .map(|(key, value)| (key.clone(), value.clone()))
86 .collect())
87 }
88
89 fn rev_range(
90 &self,
91 range: BytesFullKeyRange,
92 limit: Option<usize>,
93 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
94 let limit = limit.unwrap_or(usize::MAX);
95 Ok(self
96 .read()
97 .range(range)
98 .rev()
99 .take(limit)
100 .map(|(key, value)| (key.clone(), value.clone()))
101 .collect())
102 }
103
104 fn ingest_batch(
105 &self,
106 kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
107 ) -> StorageResult<()> {
108 let mut inner = self.write();
109 for (key, value) in kv_pairs {
110 inner.insert(key, value);
111 }
112 Ok(())
113 }
114
115 fn flush(&self) -> StorageResult<()> {
116 Ok(())
117 }
118}
119
120pub mod sled {
121 use std::fs::create_dir_all;
122 use std::ops::RangeBounds;
123
124 use bytes::Bytes;
125 use risingwave_hummock_sdk::key::FullKey;
126
127 use crate::error::StorageResult;
128 use crate::memory::{BytesFullKey, BytesFullKeyRange, RangeKv, RangeKvStateStore};
129
130 #[derive(Clone)]
131 pub struct SledRangeKv {
132 inner: sled::Db,
133 }
134
135 impl SledRangeKv {
136 pub fn new(path: impl AsRef<std::path::Path>) -> Self {
137 SledRangeKv {
138 inner: sled::open(path).expect("open"),
139 }
140 }
141
142 pub fn new_temp() -> Self {
143 create_dir_all("./.risingwave/sled").expect("should create");
144 let path = tempfile::TempDir::new_in("./.risingwave/sled")
145 .expect("find temp dir")
146 .into_path();
147 Self::new(path)
148 }
149 }
150
151 const EMPTY: u8 = 1;
152 const NON_EMPTY: u8 = 0;
153
154 impl RangeKv for SledRangeKv {
155 fn range(
156 &self,
157 range: BytesFullKeyRange,
158 limit: Option<usize>,
159 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
160 let (left, right) = range;
161 let full_key_ref_bound = (
162 left.as_ref().map(FullKey::to_ref),
163 right.as_ref().map(FullKey::to_ref),
164 );
165 let left_encoded = left.as_ref().map(|key| key.to_ref().encode_reverse_epoch());
166 let right_encoded = right
167 .as_ref()
168 .map(|key| key.to_ref().encode_reverse_epoch());
169 let limit = limit.unwrap_or(usize::MAX);
170 let mut ret = vec![];
171 for result in self.inner.range((left_encoded, right_encoded)).take(limit) {
172 let (key, value) = result?;
173 let full_key = FullKey::decode_reverse_epoch(key.as_ref()).copy_into();
174 if !full_key_ref_bound.contains(&full_key.to_ref()) {
175 continue;
176 }
177 let value = match value.as_ref() {
178 [EMPTY] => None,
179 [NON_EMPTY, rest @ ..] => Some(Bytes::from(Vec::from(rest))),
180 _ => unreachable!("malformed value: {:?}", value),
181 };
182 ret.push((full_key, value))
183 }
184 Ok(ret)
185 }
186
187 fn rev_range(
188 &self,
189 range: BytesFullKeyRange,
190 limit: Option<usize>,
191 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
192 let (left, right) = range;
193 let full_key_ref_bound = (
194 left.as_ref().map(FullKey::to_ref),
195 right.as_ref().map(FullKey::to_ref),
196 );
197 let left_encoded = left.as_ref().map(|key| key.to_ref().encode_reverse_epoch());
198 let right_encoded = right
199 .as_ref()
200 .map(|key| key.to_ref().encode_reverse_epoch());
201 let limit = limit.unwrap_or(usize::MAX);
202 let mut ret = vec![];
203 for result in self
204 .inner
205 .range((left_encoded, right_encoded))
206 .rev()
207 .take(limit)
208 {
209 let (key, value) = result?;
210 let full_key = FullKey::decode_reverse_epoch(key.as_ref()).copy_into();
211 if !full_key_ref_bound.contains(&full_key.to_ref()) {
212 continue;
213 }
214 let value = match value.as_ref() {
215 [EMPTY] => None,
216 [NON_EMPTY, rest @ ..] => Some(Bytes::from(Vec::from(rest))),
217 _ => unreachable!("malformed value: {:?}", value),
218 };
219 ret.push((full_key, value))
220 }
221 Ok(ret)
222 }
223
224 fn ingest_batch(
225 &self,
226 kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
227 ) -> StorageResult<()> {
228 let mut batch = sled::Batch::default();
229 for (key, value) in kv_pairs {
230 let encoded_key = key.encode_reverse_epoch();
231 let key = sled::IVec::from(encoded_key);
232 let mut buffer =
233 Vec::with_capacity(value.as_ref().map(|v| v.len()).unwrap_or_default() + 1);
234 if let Some(value) = value {
235 buffer.push(NON_EMPTY);
236 buffer.extend_from_slice(value.as_ref());
237 } else {
238 buffer.push(EMPTY);
239 }
240 let value = sled::IVec::from(buffer);
241 batch.insert(key, value);
242 }
243 self.inner.apply_batch(batch)?;
244 Ok(())
245 }
246
247 fn flush(&self) -> StorageResult<()> {
248 Ok(self.inner.flush().map(|_| {})?)
249 }
250 }
251
252 pub type SledStateStore = RangeKvStateStore<SledRangeKv>;
253
254 impl SledStateStore {
255 pub fn new(path: impl AsRef<std::path::Path>) -> Self {
256 RangeKvStateStore {
257 inner: SledRangeKv::new(path),
258 tables: Default::default(),
259 }
260 }
261
262 pub fn new_temp() -> Self {
263 RangeKvStateStore {
264 inner: SledRangeKv::new_temp(),
265 tables: Default::default(),
266 }
267 }
268 }
269
270 #[cfg(test)]
271 mod test {
272 use std::ops::{Bound, RangeBounds};
273
274 use bytes::Bytes;
275 use risingwave_common::catalog::TableId;
276 use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
277 use risingwave_hummock_sdk::EpochWithGap;
278 use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
279
280 use crate::memory::RangeKv;
281 use crate::memory::sled::SledRangeKv;
282
283 #[test]
284 fn test_filter_variable_key_length_false_positive() {
285 let table_id = TableId { table_id: 233 };
286 let epoch = u64::MAX - u64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]);
287 let excluded_short_table_key = [0, 1, 0, 0];
288 let included_long_table_key = [0, 1, 0, 0, 1, 2];
289 let left_table_key = [0, 1, 0, 0, 1];
290 let right_table_key = [0, 1, 1, 1];
291
292 let to_full_key = |table_key: &[u8]| FullKey {
293 user_key: UserKey {
294 table_id,
295 table_key: TableKey(Bytes::from(table_key.to_vec())),
296 },
297 epoch_with_gap: EpochWithGap::new_from_epoch(epoch & !EPOCH_SPILL_TIME_MASK),
298 };
299
300 let left_full_key = to_full_key(&left_table_key[..]);
301 let right_full_key = to_full_key(&right_table_key[..]);
302 let included_long_full_key = to_full_key(&included_long_table_key[..]);
303 let excluded_short_full_key = to_full_key(&excluded_short_table_key[..]);
304
305 assert!(
306 (
307 Bound::Included(left_full_key.to_ref()),
308 Bound::Included(right_full_key.to_ref())
309 )
310 .contains(&included_long_full_key.to_ref())
311 );
312 assert!(
313 !(
314 Bound::Included(left_full_key.to_ref()),
315 Bound::Included(right_full_key.to_ref())
316 )
317 .contains(&excluded_short_full_key.to_ref())
318 );
319
320 let left_encoded = left_full_key.encode_reverse_epoch();
321 let right_encoded = right_full_key.encode_reverse_epoch();
322
323 assert!(
324 (
325 Bound::Included(left_encoded.clone()),
326 Bound::Included(right_encoded.clone())
327 )
328 .contains(&included_long_full_key.encode_reverse_epoch())
329 );
330 assert!(
331 (
332 Bound::Included(left_encoded),
333 Bound::Included(right_encoded)
334 )
335 .contains(&excluded_short_full_key.encode_reverse_epoch())
336 );
337
338 let sled_range_kv = SledRangeKv::new_temp();
339 sled_range_kv
340 .ingest_batch(
341 vec![
342 (included_long_full_key.clone(), None),
343 (excluded_short_full_key, None),
344 ]
345 .into_iter(),
346 )
347 .unwrap();
348 let kvs = sled_range_kv
349 .range(
350 (
351 Bound::Included(left_full_key),
352 Bound::Included(right_full_key),
353 ),
354 None,
355 )
356 .unwrap();
357 assert_eq!(1, kvs.len());
358 assert_eq!(included_long_full_key.to_ref(), kvs[0].0.to_ref());
359 assert!(kvs[0].1.is_none());
360 }
361 }
362}
363
364mod batched_iter {
365
366 use super::*;
367
368 pub struct Iter<R: RangeKv> {
375 inner: R,
376 range: BytesFullKeyRange,
377 current: std::vec::IntoIter<(FullKey<Bytes>, Option<Bytes>)>,
378 rev: bool,
379 }
380
381 impl<R: RangeKv> Iter<R> {
382 pub fn new(inner: R, range: BytesFullKeyRange, rev: bool) -> Self {
383 Self {
384 inner,
385 range,
386 rev,
387 current: Vec::new().into_iter(),
388 }
389 }
390 }
391
392 impl<R: RangeKv> Iter<R> {
393 const BATCH_SIZE: usize = 256;
394
395 fn refill(&mut self) -> StorageResult<()> {
397 assert!(self.current.is_empty());
398
399 let batch = if self.rev {
400 self.inner.rev_range(
401 (self.range.0.clone(), self.range.1.clone()),
402 Some(Self::BATCH_SIZE),
403 )?
404 } else {
405 self.inner.range(
406 (self.range.0.clone(), self.range.1.clone()),
407 Some(Self::BATCH_SIZE),
408 )?
409 };
410
411 if let Some((last_key, _)) = batch.last() {
412 let full_key = FullKey::new_with_gap_epoch(
413 last_key.user_key.table_id,
414 TableKey(last_key.user_key.table_key.0.clone()),
415 last_key.epoch_with_gap,
416 );
417 if self.rev {
418 self.range.1 = Bound::Excluded(full_key);
419 } else {
420 self.range.0 = Bound::Excluded(full_key);
421 }
422 }
423 self.current = batch.into_iter();
424 Ok(())
425 }
426 }
427
428 impl<R: RangeKv> Iter<R> {
429 #[allow(clippy::type_complexity)]
430 pub fn next(&mut self) -> StorageResult<Option<(BytesFullKey, Option<Bytes>)>> {
431 match self.current.next() {
432 Some((key, value)) => Ok(Some((key, value))),
433 None => {
434 self.refill()?;
435 Ok(self.current.next())
436 }
437 }
438 }
439 }
440
441 #[cfg(test)]
442 mod tests {
443 use rand::Rng;
444
445 use super::*;
446 use crate::memory::sled::SledRangeKv;
447
448 #[test]
449 fn test_btreemap_iter_chaos() {
450 let map = Arc::new(RwLock::new(BTreeMap::new()));
451 test_iter_chaos_inner(map, 1000);
452 }
453
454 #[cfg(not(madsim))]
455 #[test]
456 fn test_sled_iter_chaos() {
457 let map = SledRangeKv::new_temp();
458 test_iter_chaos_inner(map, 100);
459 }
460
461 fn test_iter_chaos_inner(map: impl RangeKv, count: usize) {
462 let key_range = 1..=10000;
463 let num_to_bytes = |k: i32| Bytes::from(format!("{:06}", k).as_bytes().to_vec());
464 let num_to_full_key =
465 |k: i32| FullKey::new(TableId::default(), TableKey(num_to_bytes(k)), 0);
466 #[allow(clippy::mutable_key_type)]
467 map.ingest_batch(key_range.clone().map(|k| {
468 let key = num_to_full_key(k);
469 let b = key.user_key.table_key.0.clone();
470
471 (key, Some(b))
472 }))
473 .unwrap();
474
475 let rand_bound = || {
476 let key = rand::rng().random_range(key_range.clone());
477 let key = num_to_full_key(key);
478 match rand::rng().random_range(1..=5) {
479 1 | 2 => Bound::Included(key),
480 3 | 4 => Bound::Excluded(key),
481 _ => Bound::Unbounded,
482 }
483 };
484
485 for _ in 0..count {
486 let range = loop {
487 let range = (rand_bound(), rand_bound());
488 let (start, end) = (range.start_bound(), range.end_bound());
489
490 match (start, end) {
492 (Bound::Excluded(s), Bound::Excluded(e)) if s == e => {
493 continue;
494 }
495 (
496 Bound::Included(s) | Bound::Excluded(s),
497 Bound::Included(e) | Bound::Excluded(e),
498 ) if s > e => {
499 continue;
500 }
501 _ => break range,
502 }
503 };
504
505 let v1 = {
506 let mut v = vec![];
507 let mut iter = Iter::new(map.clone(), range.clone(), false);
508 while let Some((key, value)) = iter.next().unwrap() {
509 v.push((key, value));
510 }
511 v
512 };
513 let v2 = map.range(range, None).unwrap();
514
515 assert_eq!(v1, v2);
517 }
518 }
519 }
520}
521
522pub type MemoryStateStore = RangeKvStateStore<BTreeMapRangeKv>;
523
524struct TableState {
525 init_epoch: u64,
526 next_epochs: BTreeMap<u64, u64>,
527 latest_sealed_epoch: Option<u64>,
528 sealing_epochs: BTreeMap<u64, BitmapBuilder>,
529}
530
531impl TableState {
532 fn new(init_epoch: u64) -> Self {
533 Self {
534 init_epoch,
535 next_epochs: Default::default(),
536 latest_sealed_epoch: None,
537 sealing_epochs: Default::default(),
538 }
539 }
540
541 async fn wait_epoch(
542 tables: &parking_lot::Mutex<HashMap<TableId, Self>>,
543 table_id: TableId,
544 epoch: u64,
545 ) {
546 loop {
547 {
548 let tables = tables.lock();
549 let table_state = tables.get(&table_id).expect("should exist");
550 assert!(epoch >= table_state.init_epoch);
551 if epoch == table_state.init_epoch {
552 return;
553 }
554 if let Some(latest_sealed_epoch) = table_state.latest_sealed_epoch
555 && latest_sealed_epoch >= epoch
556 {
557 return;
558 }
559 }
560 yield_now().await;
561 }
562 }
563}
564
565#[derive(Clone, Default)]
571pub struct RangeKvStateStore<R: RangeKv> {
572 inner: R,
574 tables: Arc<parking_lot::Mutex<HashMap<TableId, TableState>>>,
576}
577
578fn to_full_key_range<R, B>(table_id: TableId, table_key_range: R) -> BytesFullKeyRange
579where
580 R: RangeBounds<B> + Send,
581 B: AsRef<[u8]>,
582{
583 let start = match table_key_range.start_bound() {
584 Included(k) => Included(FullKey::new(
585 table_id,
586 TableKey(Bytes::from(k.as_ref().to_vec())),
587 HummockEpoch::MAX,
588 )),
589 Excluded(k) => Excluded(FullKey::new(
590 table_id,
591 TableKey(Bytes::from(k.as_ref().to_vec())),
592 0,
593 )),
594 Unbounded => Included(FullKey::new(
595 table_id,
596 TableKey(Bytes::from(b"".to_vec())),
597 HummockEpoch::MAX,
598 )),
599 };
600 let end = match table_key_range.end_bound() {
601 Included(k) => Included(FullKey::new(
602 table_id,
603 TableKey(Bytes::from(k.as_ref().to_vec())),
604 0,
605 )),
606 Excluded(k) => Excluded(FullKey::new(
607 table_id,
608 TableKey(Bytes::from(k.as_ref().to_vec())),
609 HummockEpoch::MAX,
610 )),
611 Unbounded => {
612 if let Some(next_table_id) = table_id.table_id().checked_add(1) {
613 Excluded(FullKey::new(
614 next_table_id.into(),
615 TableKey(Bytes::from(b"".to_vec())),
616 HummockEpoch::MAX,
617 ))
618 } else {
619 Unbounded
620 }
621 }
622 };
623 (start, end)
624}
625
626impl MemoryStateStore {
627 pub fn new() -> Self {
628 Self::default()
629 }
630
631 pub fn shared() -> Self {
632 static STORE: LazyLock<MemoryStateStore> = LazyLock::new(MemoryStateStore::new);
633 STORE.clone()
634 }
635}
636
637impl<R: RangeKv> RangeKvStateStore<R> {
638 fn scan(
639 &self,
640 key_range: TableKeyRange,
641 epoch: u64,
642 table_id: TableId,
643 limit: Option<usize>,
644 ) -> StorageResult<Vec<(Bytes, Bytes)>> {
645 let mut data = vec![];
646 if limit == Some(0) {
647 return Ok(vec![]);
648 }
649 let mut last_user_key = None;
650 for (key, value) in self
651 .inner
652 .range(to_full_key_range(table_id, key_range), None)?
653 {
654 if key.epoch_with_gap.pure_epoch() > epoch {
655 continue;
656 }
657 if Some(&key.user_key) != last_user_key.as_ref() {
658 if let Some(value) = value {
659 data.push((Bytes::from(key.encode()), value.clone()));
660 }
661 last_user_key = Some(key.user_key.clone());
662 }
663 if let Some(limit) = limit
664 && data.len() >= limit
665 {
666 break;
667 }
668 }
669 Ok(data)
670 }
671}
672
673#[derive(Clone)]
674pub struct RangeKvStateStoreReadSnapshot<R: RangeKv> {
675 inner: RangeKvStateStore<R>,
676 epoch: u64,
677 table_id: TableId,
678}
679
680impl<R: RangeKv> StateStoreRead for RangeKvStateStoreReadSnapshot<R> {
681 type Iter = RangeKvStateStoreIter<R>;
682 type RevIter = RangeKvStateStoreRevIter<R>;
683
684 async fn get_keyed_row(
685 &self,
686 key: TableKey<Bytes>,
687 _read_options: ReadOptions,
688 ) -> StorageResult<Option<StateStoreKeyedRow>> {
689 self.inner
690 .get_keyed_row_impl(key, self.epoch, self.table_id)
691 }
692
693 async fn iter(
694 &self,
695 key_range: TableKeyRange,
696 _read_options: ReadOptions,
697 ) -> StorageResult<Self::Iter> {
698 self.inner.iter_impl(key_range, self.epoch, self.table_id)
699 }
700
701 async fn rev_iter(
702 &self,
703 key_range: TableKeyRange,
704 _read_options: ReadOptions,
705 ) -> StorageResult<Self::RevIter> {
706 self.inner
707 .rev_iter_impl(key_range, self.epoch, self.table_id)
708 }
709}
710
711impl<R: RangeKv> RangeKvStateStore<R> {
712 fn get_keyed_row_impl(
713 &self,
714 key: TableKey<Bytes>,
715 epoch: u64,
716 table_id: TableId,
717 ) -> StorageResult<Option<StateStoreKeyedRow>> {
718 let range_bounds = (Bound::Included(key.clone()), Bound::Included(key));
719 let res = self.scan(range_bounds, epoch, table_id, Some(1))?;
721
722 Ok(match res.as_slice() {
723 [] => None,
724 [(key, value)] => Some((
725 FullKey::decode(key.as_ref()).to_vec().into_bytes(),
726 value.clone(),
727 )),
728 _ => unreachable!(),
729 })
730 }
731
732 fn iter_impl(
733 &self,
734 key_range: TableKeyRange,
735 epoch: u64,
736 table_id: TableId,
737 ) -> StorageResult<RangeKvStateStoreIter<R>> {
738 Ok(RangeKvStateStoreIter::new(
739 batched_iter::Iter::new(
740 self.inner.clone(),
741 to_full_key_range(table_id, key_range),
742 false,
743 ),
744 epoch,
745 true,
746 ))
747 }
748
749 fn rev_iter_impl(
750 &self,
751 key_range: TableKeyRange,
752 epoch: u64,
753 table_id: TableId,
754 ) -> StorageResult<RangeKvStateStoreRevIter<R>> {
755 Ok(RangeKvStateStoreRevIter::new(
756 batched_iter::Iter::new(
757 self.inner.clone(),
758 to_full_key_range(table_id, key_range),
759 true,
760 ),
761 epoch,
762 true,
763 ))
764 }
765}
766
767impl<R: RangeKv> StateStoreReadLog for RangeKvStateStore<R> {
768 type ChangeLogIter = RangeKvStateStoreChangeLogIter<R>;
769
770 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
771 loop {
772 {
773 let tables = self.tables.lock();
774 let Some(tables) = tables.get(&options.table_id) else {
775 return Err(HummockError::next_epoch(format!(
776 "table {} not exist",
777 options.table_id
778 ))
779 .into());
780 };
781 if let Some(next_epoch) = tables.next_epochs.get(&epoch) {
782 break Ok(*next_epoch);
783 }
784 }
785 yield_now().await;
786 }
787 }
788
789 async fn iter_log(
790 &self,
791 (min_epoch, max_epoch): (u64, u64),
792 key_range: TableKeyRange,
793 options: ReadLogOptions,
794 ) -> StorageResult<Self::ChangeLogIter> {
795 let new_value_iter = RangeKvStateStoreIter::new(
796 batched_iter::Iter::new(
797 self.inner.clone(),
798 to_full_key_range(options.table_id, key_range.clone()),
799 false,
800 ),
801 max_epoch,
802 true,
803 );
804 let old_value_iter = RangeKvStateStoreIter::new(
805 batched_iter::Iter::new(
806 self.inner.clone(),
807 to_full_key_range(options.table_id, key_range),
808 false,
809 ),
810 min_epoch,
811 false,
812 );
813 RangeKvStateStoreChangeLogIter::new(new_value_iter, old_value_iter)
814 }
815}
816
817impl<R: RangeKv> RangeKvStateStore<R> {
818 fn new_read_snapshot_impl(
819 &self,
820 epoch: u64,
821 table_id: TableId,
822 ) -> RangeKvStateStoreReadSnapshot<R> {
823 RangeKvStateStoreReadSnapshot {
824 inner: self.clone(),
825 epoch,
826 table_id,
827 }
828 }
829
830 pub(crate) fn ingest_batch(
831 &self,
832 mut kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
833 delete_ranges: Vec<(Bound<Bytes>, Bound<Bytes>)>,
834 write_options: WriteOptions,
835 ) -> StorageResult<usize> {
836 let epoch = write_options.epoch;
837
838 let mut delete_keys = BTreeSet::new();
839 for del_range in delete_ranges {
840 for (key, _) in self.inner.range(
841 (
842 del_range.0.map(|table_key| {
843 FullKey::new(write_options.table_id, TableKey(table_key), epoch)
844 }),
845 del_range.1.map(|table_key| {
846 FullKey::new(write_options.table_id, TableKey(table_key), epoch)
847 }),
848 ),
849 None,
850 )? {
851 delete_keys.insert(key.user_key.table_key);
852 }
853 }
854 for key in delete_keys {
855 kv_pairs.push((key, StorageValue::new_delete()));
856 }
857
858 let mut size = 0;
859 self.inner
860 .ingest_batch(kv_pairs.into_iter().map(|(key, value)| {
861 size += key.len() + value.size();
862 (
863 FullKey::new(write_options.table_id, key, epoch),
864 value.user_value,
865 )
866 }))?;
867 Ok(size)
868 }
869}
870
871impl<R: RangeKv> StateStore for RangeKvStateStore<R> {
872 type Local = RangeKvLocalStateStore<R>;
873 type ReadSnapshot = RangeKvStateStoreReadSnapshot<R>;
874
875 async fn try_wait_epoch(
876 &self,
877 _epoch: HummockReadEpoch,
878 _options: TryWaitEpochOptions,
879 ) -> StorageResult<()> {
880 Ok(())
882 }
883
884 async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
885 RangeKvLocalStateStore::new(self.clone(), option)
886 }
887
888 async fn new_read_snapshot(
889 &self,
890 epoch: HummockReadEpoch,
891 options: NewReadSnapshotOptions,
892 ) -> StorageResult<Self::ReadSnapshot> {
893 Ok(self.new_read_snapshot_impl(epoch.get_epoch(), options.table_id))
894 }
895}
896
897pub struct RangeKvLocalStateStore<R: RangeKv> {
898 mem_table: MemTable,
899 inner: RangeKvStateStore<R>,
900
901 epoch: Option<EpochPair>,
902
903 table_id: TableId,
904 op_consistency_level: OpConsistencyLevel,
905 table_option: TableOption,
906 vnodes: Arc<Bitmap>,
907}
908
909impl<R: RangeKv> RangeKvLocalStateStore<R> {
910 pub fn new(inner: RangeKvStateStore<R>, option: NewLocalOptions) -> Self {
911 Self {
912 inner,
913 mem_table: MemTable::new(option.op_consistency_level.clone()),
914 epoch: None,
915 table_id: option.table_id,
916 op_consistency_level: option.op_consistency_level,
917 table_option: option.table_option,
918 vnodes: option.vnodes,
919 }
920 }
921}
922
923impl<R: RangeKv> LocalStateStore for RangeKvLocalStateStore<R> {
924 type FlushedSnapshotReader = RangeKvStateStoreReadSnapshot<R>;
925
926 type Iter<'a> = impl StateStoreIter + 'a;
927 type RevIter<'a> = impl StateStoreIter + 'a;
928
929 async fn get(
930 &self,
931 key: TableKey<Bytes>,
932 _read_options: ReadOptions,
933 ) -> StorageResult<Option<Bytes>> {
934 match self.mem_table.buffer.get(&key) {
935 None => self
936 .inner
937 .get_keyed_row_impl(key, self.epoch(), self.table_id)
938 .map(|option| option.map(|(_, value)| value)),
939 Some(op) => match op {
940 KeyOp::Insert(value) | KeyOp::Update((_, value)) => Ok(Some(value.clone())),
941 KeyOp::Delete(_) => Ok(None),
942 },
943 }
944 }
945
946 async fn iter(
947 &self,
948 key_range: TableKeyRange,
949 _read_options: ReadOptions,
950 ) -> StorageResult<Self::Iter<'_>> {
951 let iter = self
952 .inner
953 .iter_impl(key_range.clone(), self.epoch(), self.table_id)?;
954 Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream(
955 self.mem_table.iter(key_range),
956 iter.into_stream(to_owned_item),
957 self.table_id,
958 self.epoch(),
959 false,
960 ))))
961 }
962
963 async fn rev_iter(
964 &self,
965 key_range: TableKeyRange,
966 _read_options: ReadOptions,
967 ) -> StorageResult<Self::RevIter<'_>> {
968 let iter = self
969 .inner
970 .rev_iter_impl(key_range.clone(), self.epoch(), self.table_id)?;
971 Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream(
972 self.mem_table.rev_iter(key_range),
973 iter.into_stream(to_owned_item),
974 self.table_id,
975 self.epoch(),
976 true,
977 ))))
978 }
979
980 fn insert(
981 &mut self,
982 key: TableKey<Bytes>,
983 new_val: Bytes,
984 old_val: Option<Bytes>,
985 ) -> StorageResult<()> {
986 match old_val {
987 None => self.mem_table.insert(key, new_val)?,
988 Some(old_val) => self.mem_table.update(key, old_val, new_val)?,
989 };
990 Ok(())
991 }
992
993 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
994 Ok(self.mem_table.delete(key, old_val)?)
995 }
996
997 async fn flush(&mut self) -> StorageResult<usize> {
998 let buffer = self.mem_table.drain().into_parts();
999 let mut kv_pairs = Vec::with_capacity(buffer.len());
1000 let sanity_check_read_snapshot = if sanity_check_enabled() {
1001 Some(self.inner.new_read_snapshot_impl(MAX_EPOCH, self.table_id))
1002 } else {
1003 None
1004 };
1005 for (key, key_op) in buffer {
1006 match key_op {
1007 KeyOp::Insert(value) => {
1011 if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1012 do_insert_sanity_check(
1013 &key,
1014 &value,
1015 sanity_check_read_snapshot,
1016 self.table_id,
1017 self.table_option,
1018 &self.op_consistency_level,
1019 )
1020 .await?;
1021 }
1022 kv_pairs.push((key, StorageValue::new_put(value)));
1023 }
1024 KeyOp::Delete(old_value) => {
1025 if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1026 do_delete_sanity_check(
1027 &key,
1028 &old_value,
1029 sanity_check_read_snapshot,
1030 self.table_id,
1031 self.table_option,
1032 &self.op_consistency_level,
1033 )
1034 .await?;
1035 }
1036 kv_pairs.push((key, StorageValue::new_delete()));
1037 }
1038 KeyOp::Update((old_value, new_value)) => {
1039 if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1040 do_update_sanity_check(
1041 &key,
1042 &old_value,
1043 &new_value,
1044 sanity_check_read_snapshot,
1045 self.table_id,
1046 self.table_option,
1047 &self.op_consistency_level,
1048 )
1049 .await?;
1050 }
1051 kv_pairs.push((key, StorageValue::new_put(new_value)));
1052 }
1053 }
1054 }
1055 self.inner.ingest_batch(
1056 kv_pairs,
1057 vec![],
1058 WriteOptions {
1059 epoch: self.epoch(),
1060 table_id: self.table_id,
1061 },
1062 )
1063 }
1064
1065 fn epoch(&self) -> u64 {
1066 self.epoch.expect("should have set the epoch").curr
1067 }
1068
1069 fn is_dirty(&self) -> bool {
1070 self.mem_table.is_dirty()
1071 }
1072
1073 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1074 assert_eq!(
1075 self.epoch.replace(options.epoch),
1076 None,
1077 "epoch in local state store of table id {:?} is init for more than once",
1078 self.table_id
1079 );
1080 self.inner
1081 .tables
1082 .lock()
1083 .entry(self.table_id)
1084 .or_insert_with(|| TableState::new(options.epoch.prev))
1085 .next_epochs
1086 .insert(options.epoch.prev, options.epoch.curr);
1087 if self.vnodes.len() > 1 {
1088 TableState::wait_epoch(&self.inner.tables, self.table_id, options.epoch.prev).await;
1089 }
1090
1091 Ok(())
1092 }
1093
1094 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1095 assert!(!self.is_dirty());
1096 if let Some(value_checker) = opts.switch_op_consistency_level {
1097 self.mem_table.op_consistency_level.update(&value_checker);
1098 }
1099 let epoch = self
1100 .epoch
1101 .as_mut()
1102 .expect("should have init epoch before seal the first epoch");
1103 let prev_epoch = epoch.curr;
1104 epoch.prev = prev_epoch;
1105 epoch.curr = next_epoch;
1106 assert!(
1107 next_epoch > prev_epoch,
1108 "new epoch {} should be greater than current epoch: {}",
1109 next_epoch,
1110 prev_epoch
1111 );
1112
1113 let mut tables = self.inner.tables.lock();
1114 let table_state = tables
1115 .get_mut(&self.table_id)
1116 .expect("should be set when init");
1117
1118 table_state.next_epochs.insert(prev_epoch, next_epoch);
1119 if self.vnodes.len() > 1 {
1120 let sealing_epoch_vnodes = table_state
1121 .sealing_epochs
1122 .entry(prev_epoch)
1123 .or_insert_with(|| BitmapBuilder::zeroed(self.vnodes.len()));
1124 assert_eq!(self.vnodes.len(), sealing_epoch_vnodes.len());
1125 for vnode in self.vnodes.iter_ones() {
1126 assert!(!sealing_epoch_vnodes.is_set(vnode));
1127 sealing_epoch_vnodes.set(vnode, true);
1128 }
1129 if (0..self.vnodes.len()).all(|vnode| sealing_epoch_vnodes.is_set(vnode)) {
1130 let (all_sealed_epoch, _) =
1131 table_state.sealing_epochs.pop_first().expect("non-empty");
1132 assert_eq!(
1133 all_sealed_epoch, prev_epoch,
1134 "new all_sealed_epoch must be the current prev epoch"
1135 );
1136 if let Some(prev_latest_sealed_epoch) =
1137 table_state.latest_sealed_epoch.replace(prev_epoch)
1138 {
1139 assert!(prev_epoch > prev_latest_sealed_epoch);
1140 }
1141 }
1142 }
1143
1144 if let Some((direction, watermarks, _watermark_type)) = opts.table_watermarks {
1145 let delete_ranges = watermarks
1146 .iter()
1147 .flat_map(|vnode_watermark| {
1148 let inner_range = match direction {
1149 WatermarkDirection::Ascending => {
1150 (Unbounded, Excluded(vnode_watermark.watermark().clone()))
1151 }
1152 WatermarkDirection::Descending => {
1153 (Excluded(vnode_watermark.watermark().clone()), Unbounded)
1154 }
1155 };
1156 vnode_watermark
1157 .vnode_bitmap()
1158 .iter_vnodes()
1159 .map(move |vnode| {
1160 let (start, end) =
1161 prefixed_range_with_vnode(inner_range.clone(), vnode);
1162 (start.map(|key| key.0.clone()), end.map(|key| key.0.clone()))
1163 })
1164 })
1165 .collect_vec();
1166 if let Err(e) = self.inner.ingest_batch(
1167 Vec::new(),
1168 delete_ranges,
1169 WriteOptions {
1170 epoch: self.epoch(),
1171 table_id: self.table_id,
1172 },
1173 ) {
1174 error!(error = %e.as_report(), "failed to write delete ranges of table watermark");
1175 }
1176 }
1177 }
1178
1179 async fn try_flush(&mut self) -> StorageResult<()> {
1180 Ok(())
1181 }
1182
1183 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1184 if self.vnodes.len() > 1 {
1185 TableState::wait_epoch(
1186 &self.inner.tables,
1187 self.table_id,
1188 self.epoch.expect("should have init").prev,
1189 )
1190 .await;
1191 }
1192 Ok(std::mem::replace(&mut self.vnodes, vnodes))
1193 }
1194
1195 fn get_table_watermark(&self, _vnode: VirtualNode) -> Option<Bytes> {
1196 None
1198 }
1199
1200 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1201 self.inner.new_read_snapshot_impl(MAX_EPOCH, self.table_id)
1202 }
1203}
1204
1205pub struct RangeKvStateStoreIter<R: RangeKv> {
1206 inner: batched_iter::Iter<R>,
1207
1208 epoch: HummockEpoch,
1209 is_inclusive_epoch: bool,
1210
1211 last_key: Option<UserKey<Bytes>>,
1212
1213 item_buffer: Option<StateStoreKeyedRow>,
1214}
1215
1216impl<R: RangeKv> RangeKvStateStoreIter<R> {
1217 pub fn new(
1218 inner: batched_iter::Iter<R>,
1219 epoch: HummockEpoch,
1220 is_inclusive_epoch: bool,
1221 ) -> Self {
1222 Self {
1223 inner,
1224 epoch,
1225 is_inclusive_epoch,
1226 last_key: None,
1227 item_buffer: None,
1228 }
1229 }
1230}
1231
1232impl<R: RangeKv> StateStoreIter for RangeKvStateStoreIter<R> {
1233 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
1234 self.next_inner()?;
1235 Ok(self
1236 .item_buffer
1237 .as_ref()
1238 .map(|(key, value)| (key.to_ref(), value.as_ref())))
1239 }
1240}
1241
1242impl<R: RangeKv> RangeKvStateStoreIter<R> {
1243 fn next_inner(&mut self) -> StorageResult<()> {
1244 self.item_buffer = None;
1245 while let Some((key, value)) = self.inner.next()? {
1246 let epoch = key.epoch_with_gap.pure_epoch();
1247 if epoch > self.epoch {
1248 continue;
1249 }
1250 if epoch == self.epoch && !self.is_inclusive_epoch {
1251 continue;
1252 }
1253 if Some(key.user_key.as_ref()) != self.last_key.as_ref().map(|key| key.as_ref()) {
1254 self.last_key = Some(key.user_key.clone());
1255 if let Some(value) = value {
1256 self.item_buffer = Some((key, value));
1257 break;
1258 }
1259 }
1260 }
1261 Ok(())
1262 }
1263}
1264
1265pub struct RangeKvStateStoreRevIter<R: RangeKv> {
1266 inner: batched_iter::Iter<R>,
1267
1268 epoch: HummockEpoch,
1269 is_inclusive_epoch: bool,
1270
1271 item_buffer: VecDeque<StateStoreKeyedRow>,
1272}
1273
1274impl<R: RangeKv> RangeKvStateStoreRevIter<R> {
1275 pub fn new(
1276 inner: batched_iter::Iter<R>,
1277 epoch: HummockEpoch,
1278 is_inclusive_epoch: bool,
1279 ) -> Self {
1280 Self {
1281 inner,
1282 epoch,
1283 is_inclusive_epoch,
1284 item_buffer: VecDeque::default(),
1285 }
1286 }
1287}
1288
1289impl<R: RangeKv> StateStoreIter for RangeKvStateStoreRevIter<R> {
1290 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
1291 self.next_inner()?;
1292 Ok(self
1293 .item_buffer
1294 .back()
1295 .map(|(key, value)| (key.to_ref(), value.as_ref())))
1296 }
1297}
1298
1299impl<R: RangeKv> RangeKvStateStoreRevIter<R> {
1300 fn next_inner(&mut self) -> StorageResult<()> {
1301 self.item_buffer.pop_back();
1302 while let Some((key, value)) = self.inner.next()? {
1303 let epoch = key.epoch_with_gap.pure_epoch();
1304 if epoch > self.epoch {
1305 continue;
1306 }
1307 if epoch == self.epoch && !self.is_inclusive_epoch {
1308 continue;
1309 }
1310
1311 let v = match value {
1312 Some(v) => v,
1313 None => {
1314 if let Some(last_key) = self.item_buffer.front()
1315 && key.user_key.as_ref() == last_key.0.user_key.as_ref()
1316 {
1317 self.item_buffer.clear();
1318 }
1319 continue;
1320 }
1321 };
1322
1323 if let Some(last_key) = self.item_buffer.front() {
1324 if key.user_key.as_ref() != last_key.0.user_key.as_ref() {
1325 self.item_buffer.push_front((key, v));
1326 break;
1327 } else {
1328 self.item_buffer.pop_front();
1329 self.item_buffer.push_front((key, v));
1330 }
1331 } else {
1332 self.item_buffer.push_front((key, v));
1333 }
1334 }
1335 Ok(())
1336 }
1337}
1338
1339pub struct RangeKvStateStoreChangeLogIter<R: RangeKv> {
1340 new_value_iter: RangeKvStateStoreIter<R>,
1341 old_value_iter: RangeKvStateStoreIter<R>,
1342 item_buffer: Option<(TableKey<Bytes>, ChangeLogValue<Bytes>)>,
1343}
1344
1345impl<R: RangeKv> RangeKvStateStoreChangeLogIter<R> {
1346 fn new(
1347 mut new_value_iter: RangeKvStateStoreIter<R>,
1348 mut old_value_iter: RangeKvStateStoreIter<R>,
1349 ) -> StorageResult<Self> {
1350 new_value_iter.next_inner()?;
1351 old_value_iter.next_inner()?;
1352 Ok(Self {
1353 new_value_iter,
1354 old_value_iter,
1355 item_buffer: None,
1356 })
1357 }
1358}
1359
1360impl<R: RangeKv> StateStoreIter<StateStoreReadLogItem> for RangeKvStateStoreChangeLogIter<R> {
1361 async fn try_next(&mut self) -> StorageResult<Option<StateStoreReadLogItemRef<'_>>> {
1362 loop {
1363 match (
1364 &self.new_value_iter.item_buffer,
1365 &self.old_value_iter.item_buffer,
1366 ) {
1367 (None, None) => {
1368 self.item_buffer = None;
1369 break;
1370 }
1371 (Some((key, new_value)), None) => {
1372 self.item_buffer = Some((
1373 key.user_key.table_key.clone(),
1374 ChangeLogValue::Insert(new_value.clone()),
1375 ));
1376 self.new_value_iter.next_inner()?;
1377 }
1378 (None, Some((key, old_value))) => {
1379 self.item_buffer = Some((
1380 key.user_key.table_key.clone(),
1381 ChangeLogValue::Delete(old_value.clone()),
1382 ));
1383 self.old_value_iter.next_inner()?;
1384 }
1385 (Some((new_value_key, new_value)), Some((old_value_key, old_value))) => {
1386 match new_value_key.user_key.cmp(&old_value_key.user_key) {
1387 Ordering::Less => {
1388 self.item_buffer = Some((
1389 new_value_key.user_key.table_key.clone(),
1390 ChangeLogValue::Insert(new_value.clone()),
1391 ));
1392 self.new_value_iter.next_inner()?;
1393 }
1394 Ordering::Greater => {
1395 self.item_buffer = Some((
1396 old_value_key.user_key.table_key.clone(),
1397 ChangeLogValue::Delete(old_value.clone()),
1398 ));
1399 self.old_value_iter.next_inner()?;
1400 }
1401 Ordering::Equal => {
1402 if new_value == old_value {
1403 self.new_value_iter.next_inner()?;
1404 self.old_value_iter.next_inner()?;
1405 continue;
1406 }
1407 self.item_buffer = Some((
1408 new_value_key.user_key.table_key.clone(),
1409 ChangeLogValue::Update {
1410 new_value: new_value.clone(),
1411 old_value: old_value.clone(),
1412 },
1413 ));
1414 self.new_value_iter.next_inner()?;
1415 self.old_value_iter.next_inner()?;
1416 }
1417 }
1418 }
1419 };
1420 break;
1421 }
1422 Ok(self
1423 .item_buffer
1424 .as_ref()
1425 .map(|(key, value)| (key.to_ref(), value.to_ref())))
1426 }
1427}
1428
1429#[cfg(test)]
1430mod tests {
1431 use risingwave_common::util::epoch::test_epoch;
1432
1433 use super::*;
1434 use crate::hummock::iterator::test_utils::{
1435 iterator_test_table_key_of, iterator_test_value_of,
1436 };
1437 use crate::hummock::test_utils::StateStoreReadTestExt;
1438 use crate::memory::sled::SledStateStore;
1439
1440 #[tokio::test]
1441 async fn test_snapshot_isolation_memory() {
1442 let state_store = MemoryStateStore::new();
1443 test_snapshot_isolation_inner(state_store).await;
1444 }
1445
1446 #[cfg(not(madsim))]
1447 #[tokio::test]
1448 async fn test_snapshot_isolation_sled() {
1449 let state_store = SledStateStore::new_temp();
1450 test_snapshot_isolation_inner(state_store).await;
1451 }
1452
1453 async fn test_snapshot_isolation_inner(state_store: RangeKvStateStore<impl RangeKv>) {
1454 state_store
1455 .ingest_batch(
1456 vec![
1457 (
1458 TableKey(Bytes::from(b"a".to_vec())),
1459 StorageValue::new_put(b"v1".to_vec()),
1460 ),
1461 (
1462 TableKey(Bytes::from(b"b".to_vec())),
1463 StorageValue::new_put(b"v1".to_vec()),
1464 ),
1465 ],
1466 vec![],
1467 WriteOptions {
1468 epoch: 0,
1469 table_id: Default::default(),
1470 },
1471 )
1472 .unwrap();
1473 state_store
1474 .ingest_batch(
1475 vec![
1476 (
1477 TableKey(Bytes::from(b"a".to_vec())),
1478 StorageValue::new_put(b"v2".to_vec()),
1479 ),
1480 (
1481 TableKey(Bytes::from(b"b".to_vec())),
1482 StorageValue::new_delete(),
1483 ),
1484 ],
1485 vec![],
1486 WriteOptions {
1487 epoch: test_epoch(1),
1488 table_id: Default::default(),
1489 },
1490 )
1491 .unwrap();
1492 assert_eq!(
1493 state_store
1494 .scan(
1495 (
1496 Bound::Included(TableKey(Bytes::from("a"))),
1497 Bound::Included(TableKey(Bytes::from("b"))),
1498 ),
1499 0,
1500 TableId::default(),
1501 None,
1502 )
1503 .unwrap(),
1504 vec![
1505 (
1506 FullKey::for_test(Default::default(), Bytes::from("a"), 0)
1507 .encode()
1508 .into(),
1509 b"v1".to_vec().into()
1510 ),
1511 (
1512 FullKey::for_test(Default::default(), Bytes::from("b"), 0)
1513 .encode()
1514 .into(),
1515 b"v1".to_vec().into()
1516 )
1517 ]
1518 );
1519 assert_eq!(
1520 state_store
1521 .scan(
1522 (
1523 Bound::Included(TableKey(Bytes::from("a"))),
1524 Bound::Included(TableKey(Bytes::from("b"))),
1525 ),
1526 0,
1527 TableId::default(),
1528 Some(1),
1529 )
1530 .unwrap(),
1531 vec![(
1532 FullKey::for_test(Default::default(), b"a".to_vec(), 0)
1533 .encode()
1534 .into(),
1535 b"v1".to_vec().into()
1536 )]
1537 );
1538 assert_eq!(
1539 state_store
1540 .scan(
1541 (
1542 Bound::Included(TableKey(Bytes::from("a"))),
1543 Bound::Included(TableKey(Bytes::from("b"))),
1544 ),
1545 test_epoch(1),
1546 TableId::default(),
1547 None,
1548 )
1549 .unwrap(),
1550 vec![(
1551 FullKey::for_test(Default::default(), b"a".to_vec(), test_epoch(1))
1552 .encode()
1553 .into(),
1554 b"v2".to_vec().into()
1555 )]
1556 );
1557 assert_eq!(
1558 state_store
1559 .get(TableKey(Bytes::from("a")), 0, ReadOptions::default(),)
1560 .await
1561 .unwrap(),
1562 Some(Bytes::from("v1"))
1563 );
1564 assert_eq!(
1565 state_store
1566 .get(
1567 TableKey(Bytes::copy_from_slice(b"b")),
1568 0,
1569 ReadOptions::default(),
1570 )
1571 .await
1572 .unwrap(),
1573 Some(b"v1".to_vec().into())
1574 );
1575 assert_eq!(
1576 state_store
1577 .get(
1578 TableKey(Bytes::copy_from_slice(b"c")),
1579 0,
1580 ReadOptions::default(),
1581 )
1582 .await
1583 .unwrap(),
1584 None
1585 );
1586 assert_eq!(
1587 state_store
1588 .get(
1589 TableKey(Bytes::copy_from_slice(b"a")),
1590 test_epoch(1),
1591 ReadOptions::default(),
1592 )
1593 .await
1594 .unwrap(),
1595 Some(b"v2".to_vec().into())
1596 );
1597 assert_eq!(
1598 state_store
1599 .get(
1600 TableKey(Bytes::from("b")),
1601 test_epoch(1),
1602 ReadOptions::default(),
1603 )
1604 .await
1605 .unwrap(),
1606 None
1607 );
1608 assert_eq!(
1609 state_store
1610 .get(
1611 TableKey(Bytes::from("c")),
1612 test_epoch(1),
1613 ReadOptions::default()
1614 )
1615 .await
1616 .unwrap(),
1617 None
1618 );
1619 }
1620
1621 #[tokio::test]
1622 async fn test_iter_log_memory() {
1623 let state_store = MemoryStateStore::new();
1624 test_iter_log_inner(state_store).await;
1625 }
1626
1627 #[cfg(not(madsim))]
1628 #[tokio::test]
1629 async fn test_iter_log_sled() {
1630 let state_store = SledStateStore::new_temp();
1631 test_iter_log_inner(state_store).await;
1632 }
1633
1634 async fn test_iter_log_inner(state_store: RangeKvStateStore<impl RangeKv>) {
1635 let table_id = TableId::new(233);
1636 let epoch1 = test_epoch(1);
1637 let key_idx = [1, 2, 4];
1638 let make_key = |i| TableKey(Bytes::from(iterator_test_table_key_of(i)));
1639 let make_value = |i| Bytes::from(iterator_test_value_of(i));
1640 state_store
1641 .ingest_batch(
1642 key_idx
1643 .iter()
1644 .map(|i| (make_key(*i), StorageValue::new_put(make_value(*i))))
1645 .collect(),
1646 vec![],
1647 WriteOptions {
1648 epoch: epoch1,
1649 table_id,
1650 },
1651 )
1652 .unwrap();
1653 {
1654 let mut iter = state_store
1655 .iter_log(
1656 (epoch1, epoch1),
1657 (Unbounded, Unbounded),
1658 ReadLogOptions { table_id },
1659 )
1660 .await
1661 .unwrap();
1662 for i in key_idx {
1663 let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1664 assert_eq!(make_key(i).to_ref(), iter_key);
1665 assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1666 }
1667 assert!(iter.try_next().await.unwrap().is_none());
1668 }
1669
1670 let epoch2 = test_epoch(2);
1671 state_store
1672 .ingest_batch(
1673 vec![
1674 (make_key(1), StorageValue::new_put(make_value(12))), (make_key(2), StorageValue::new_delete()), (make_key(3), StorageValue::new_put(make_value(3))),
1677 ],
1678 vec![],
1679 WriteOptions {
1680 epoch: epoch2,
1681 table_id,
1682 },
1683 )
1684 .unwrap();
1685
1686 {
1688 let expected = vec![
1689 (
1690 make_key(1),
1691 ChangeLogValue::Update {
1692 new_value: make_value(12),
1693 old_value: make_value(1),
1694 },
1695 ),
1696 (make_key(2), ChangeLogValue::Delete(make_value(2))),
1697 (make_key(3), ChangeLogValue::Insert(make_value(3))),
1698 ];
1699 let mut iter = state_store
1700 .iter_log(
1701 (epoch2, epoch2),
1702 (Unbounded, Unbounded),
1703 ReadLogOptions { table_id },
1704 )
1705 .await
1706 .unwrap();
1707 for (key, change_log_value) in expected {
1708 let (iter_key, iter_value) = iter.try_next().await.unwrap().unwrap();
1709 assert_eq!(
1710 key.to_ref(),
1711 iter_key,
1712 "{:?} {:?}",
1713 change_log_value.to_ref(),
1714 iter_value
1715 );
1716 assert_eq!(change_log_value.to_ref(), iter_value);
1717 }
1718 assert!(iter.try_next().await.unwrap().is_none());
1719 }
1720 {
1722 let mut iter = state_store
1723 .iter_log(
1724 (epoch1, epoch1),
1725 (Unbounded, Unbounded),
1726 ReadLogOptions { table_id },
1727 )
1728 .await
1729 .unwrap();
1730 for i in key_idx {
1731 let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1732 assert_eq!(make_key(i).to_ref(), iter_key);
1733 assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1734 }
1735 assert!(iter.try_next().await.unwrap().is_none());
1736 }
1737 {
1739 let mut iter = state_store
1740 .iter_log(
1741 (epoch1, epoch2),
1742 (Unbounded, Unbounded),
1743 ReadLogOptions { table_id },
1744 )
1745 .await
1746 .unwrap();
1747 let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1748 assert_eq!(make_key(1).to_ref(), iter_key);
1749 assert_eq!(
1750 change_value,
1751 ChangeLogValue::Insert(make_value(12).as_ref())
1752 );
1753 for i in [3, 4] {
1754 let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1755 assert_eq!(make_key(i).to_ref(), iter_key);
1756 assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1757 }
1758 assert!(iter.try_next().await.unwrap().is_none());
1759 }
1760 }
1761}