1use std::cmp::Ordering;
16use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
17use std::mem::take;
18use std::ops::Bound::{Excluded, Included, Unbounded};
19use std::ops::{Bound, RangeBounds};
20use std::sync::{Arc, LazyLock};
21
22use bytes::Bytes;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
26use risingwave_common::catalog::{TableId, TableOption};
27use risingwave_common::dispatch_distance_measurement;
28use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
29use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH};
30use risingwave_hummock_sdk::key::{
31 FullKey, TableKey, TableKeyRange, UserKey, prefixed_range_with_vnode,
32};
33use risingwave_hummock_sdk::table_watermark::WatermarkDirection;
34use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
35use thiserror_ext::AsReport;
36use tokio::task::yield_now;
37use tracing::error;
38
39use crate::error::StorageResult;
40use crate::hummock::HummockError;
41use crate::hummock::utils::{
42 do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, merge_stream,
43 sanity_check_enabled,
44};
45use crate::mem_table::{KeyOp, MemTable};
46use crate::storage_value::StorageValue;
47use crate::store::*;
48use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
49
50pub type BytesFullKey = FullKey<Bytes>;
51pub type BytesFullKeyRange = (Bound<BytesFullKey>, Bound<BytesFullKey>);
52
53#[allow(clippy::type_complexity)]
54pub trait RangeKv: Clone + Send + Sync + 'static {
55 fn range(
56 &self,
57 range: BytesFullKeyRange,
58 limit: Option<usize>,
59 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>>;
60
61 fn rev_range(
62 &self,
63 range: BytesFullKeyRange,
64 limit: Option<usize>,
65 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>>;
66
67 fn ingest_batch(
68 &self,
69 kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
70 ) -> StorageResult<()>;
71
72 fn flush(&self) -> StorageResult<()>;
73}
74
75pub type BTreeMapRangeKv = Arc<RwLock<BTreeMap<BytesFullKey, Option<Bytes>>>>;
76
77impl RangeKv for BTreeMapRangeKv {
78 fn range(
79 &self,
80 range: BytesFullKeyRange,
81 limit: Option<usize>,
82 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
83 let limit = limit.unwrap_or(usize::MAX);
84 Ok(self
85 .read()
86 .range(range)
87 .take(limit)
88 .map(|(key, value)| (key.clone(), value.clone()))
89 .collect())
90 }
91
92 fn rev_range(
93 &self,
94 range: BytesFullKeyRange,
95 limit: Option<usize>,
96 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
97 let limit = limit.unwrap_or(usize::MAX);
98 Ok(self
99 .read()
100 .range(range)
101 .rev()
102 .take(limit)
103 .map(|(key, value)| (key.clone(), value.clone()))
104 .collect())
105 }
106
107 fn ingest_batch(
108 &self,
109 kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
110 ) -> StorageResult<()> {
111 let mut inner = self.write();
112 for (key, value) in kv_pairs {
113 inner.insert(key, value);
114 }
115 Ok(())
116 }
117
118 fn flush(&self) -> StorageResult<()> {
119 Ok(())
120 }
121}
122
123pub mod sled {
124 use std::fs::create_dir_all;
125 use std::ops::RangeBounds;
126
127 use bytes::Bytes;
128 use risingwave_hummock_sdk::key::FullKey;
129
130 use crate::error::StorageResult;
131 use crate::memory::{BytesFullKey, BytesFullKeyRange, RangeKv, RangeKvStateStore};
132
133 #[derive(Clone)]
134 pub struct SledRangeKv {
135 inner: sled::Db,
136 }
137
138 impl SledRangeKv {
139 pub fn new(path: impl AsRef<std::path::Path>) -> Self {
140 SledRangeKv {
141 inner: sled::open(path).expect("open"),
142 }
143 }
144
145 pub fn new_temp() -> Self {
146 create_dir_all("./.risingwave/sled").expect("should create");
147 let path = tempfile::TempDir::new_in("./.risingwave/sled")
148 .expect("find temp dir")
149 .keep();
150 Self::new(path)
151 }
152 }
153
154 const EMPTY: u8 = 1;
155 const NON_EMPTY: u8 = 0;
156
157 impl RangeKv for SledRangeKv {
158 fn range(
159 &self,
160 range: BytesFullKeyRange,
161 limit: Option<usize>,
162 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
163 let (left, right) = range;
164 let full_key_ref_bound = (
165 left.as_ref().map(FullKey::to_ref),
166 right.as_ref().map(FullKey::to_ref),
167 );
168 let left_encoded = left.as_ref().map(|key| key.to_ref().encode_reverse_epoch());
169 let right_encoded = right
170 .as_ref()
171 .map(|key| key.to_ref().encode_reverse_epoch());
172 let limit = limit.unwrap_or(usize::MAX);
173 let mut ret = vec![];
174 for result in self.inner.range((left_encoded, right_encoded)).take(limit) {
175 let (key, value) = result?;
176 let full_key = FullKey::decode_reverse_epoch(key.as_ref()).copy_into();
177 if !full_key_ref_bound.contains(&full_key.to_ref()) {
178 continue;
179 }
180 let value = match value.as_ref() {
181 [EMPTY] => None,
182 [NON_EMPTY, rest @ ..] => Some(Bytes::from(Vec::from(rest))),
183 _ => unreachable!("malformed value: {:?}", value),
184 };
185 ret.push((full_key, value))
186 }
187 Ok(ret)
188 }
189
190 fn rev_range(
191 &self,
192 range: BytesFullKeyRange,
193 limit: Option<usize>,
194 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
195 let (left, right) = range;
196 let full_key_ref_bound = (
197 left.as_ref().map(FullKey::to_ref),
198 right.as_ref().map(FullKey::to_ref),
199 );
200 let left_encoded = left.as_ref().map(|key| key.to_ref().encode_reverse_epoch());
201 let right_encoded = right
202 .as_ref()
203 .map(|key| key.to_ref().encode_reverse_epoch());
204 let limit = limit.unwrap_or(usize::MAX);
205 let mut ret = vec![];
206 for result in self
207 .inner
208 .range((left_encoded, right_encoded))
209 .rev()
210 .take(limit)
211 {
212 let (key, value) = result?;
213 let full_key = FullKey::decode_reverse_epoch(key.as_ref()).copy_into();
214 if !full_key_ref_bound.contains(&full_key.to_ref()) {
215 continue;
216 }
217 let value = match value.as_ref() {
218 [EMPTY] => None,
219 [NON_EMPTY, rest @ ..] => Some(Bytes::from(Vec::from(rest))),
220 _ => unreachable!("malformed value: {:?}", value),
221 };
222 ret.push((full_key, value))
223 }
224 Ok(ret)
225 }
226
227 fn ingest_batch(
228 &self,
229 kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
230 ) -> StorageResult<()> {
231 let mut batch = sled::Batch::default();
232 for (key, value) in kv_pairs {
233 let encoded_key = key.encode_reverse_epoch();
234 let key = sled::IVec::from(encoded_key);
235 let mut buffer =
236 Vec::with_capacity(value.as_ref().map(|v| v.len()).unwrap_or_default() + 1);
237 if let Some(value) = value {
238 buffer.push(NON_EMPTY);
239 buffer.extend_from_slice(value.as_ref());
240 } else {
241 buffer.push(EMPTY);
242 }
243 let value = sled::IVec::from(buffer);
244 batch.insert(key, value);
245 }
246 self.inner.apply_batch(batch)?;
247 Ok(())
248 }
249
250 fn flush(&self) -> StorageResult<()> {
251 Ok(self.inner.flush().map(|_| {})?)
252 }
253 }
254
255 pub type SledStateStore = RangeKvStateStore<SledRangeKv>;
256
257 impl SledStateStore {
258 pub fn new(path: impl AsRef<std::path::Path>) -> Self {
259 RangeKvStateStore {
260 inner: SledRangeKv::new(path),
261 tables: Default::default(),
262 vectors: Default::default(),
263 }
264 }
265
266 pub fn new_temp() -> Self {
267 RangeKvStateStore {
268 inner: SledRangeKv::new_temp(),
269 tables: Default::default(),
270 vectors: Default::default(),
271 }
272 }
273 }
274
275 #[cfg(test)]
276 mod test {
277 use std::ops::{Bound, RangeBounds};
278
279 use bytes::Bytes;
280 use risingwave_common::catalog::TableId;
281 use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
282 use risingwave_hummock_sdk::EpochWithGap;
283 use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
284
285 use crate::memory::RangeKv;
286 use crate::memory::sled::SledRangeKv;
287
288 #[test]
289 fn test_filter_variable_key_length_false_positive() {
290 let table_id = TableId { table_id: 233 };
291 let epoch = u64::MAX - u64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]);
292 let excluded_short_table_key = [0, 1, 0, 0];
293 let included_long_table_key = [0, 1, 0, 0, 1, 2];
294 let left_table_key = [0, 1, 0, 0, 1];
295 let right_table_key = [0, 1, 1, 1];
296
297 let to_full_key = |table_key: &[u8]| FullKey {
298 user_key: UserKey {
299 table_id,
300 table_key: TableKey(Bytes::from(table_key.to_vec())),
301 },
302 epoch_with_gap: EpochWithGap::new_from_epoch(epoch & !EPOCH_SPILL_TIME_MASK),
303 };
304
305 let left_full_key = to_full_key(&left_table_key[..]);
306 let right_full_key = to_full_key(&right_table_key[..]);
307 let included_long_full_key = to_full_key(&included_long_table_key[..]);
308 let excluded_short_full_key = to_full_key(&excluded_short_table_key[..]);
309
310 assert!(
311 (
312 Bound::Included(left_full_key.to_ref()),
313 Bound::Included(right_full_key.to_ref())
314 )
315 .contains(&included_long_full_key.to_ref())
316 );
317 assert!(
318 !(
319 Bound::Included(left_full_key.to_ref()),
320 Bound::Included(right_full_key.to_ref())
321 )
322 .contains(&excluded_short_full_key.to_ref())
323 );
324
325 let left_encoded = left_full_key.encode_reverse_epoch();
326 let right_encoded = right_full_key.encode_reverse_epoch();
327
328 assert!(
329 (
330 Bound::Included(left_encoded.clone()),
331 Bound::Included(right_encoded.clone())
332 )
333 .contains(&included_long_full_key.encode_reverse_epoch())
334 );
335 assert!(
336 (
337 Bound::Included(left_encoded),
338 Bound::Included(right_encoded)
339 )
340 .contains(&excluded_short_full_key.encode_reverse_epoch())
341 );
342
343 let sled_range_kv = SledRangeKv::new_temp();
344 sled_range_kv
345 .ingest_batch(
346 vec![
347 (included_long_full_key.clone(), None),
348 (excluded_short_full_key, None),
349 ]
350 .into_iter(),
351 )
352 .unwrap();
353 let kvs = sled_range_kv
354 .range(
355 (
356 Bound::Included(left_full_key),
357 Bound::Included(right_full_key),
358 ),
359 None,
360 )
361 .unwrap();
362 assert_eq!(1, kvs.len());
363 assert_eq!(included_long_full_key.to_ref(), kvs[0].0.to_ref());
364 assert!(kvs[0].1.is_none());
365 }
366 }
367}
368
369mod batched_iter {
370
371 use super::*;
372
373 pub struct Iter<R: RangeKv> {
380 inner: R,
381 range: BytesFullKeyRange,
382 current: std::vec::IntoIter<(FullKey<Bytes>, Option<Bytes>)>,
383 rev: bool,
384 }
385
386 impl<R: RangeKv> Iter<R> {
387 pub fn new(inner: R, range: BytesFullKeyRange, rev: bool) -> Self {
388 Self {
389 inner,
390 range,
391 rev,
392 current: Vec::new().into_iter(),
393 }
394 }
395 }
396
397 impl<R: RangeKv> Iter<R> {
398 const BATCH_SIZE: usize = 256;
399
400 fn refill(&mut self) -> StorageResult<()> {
402 assert!(self.current.is_empty());
403
404 let batch = if self.rev {
405 self.inner.rev_range(
406 (self.range.0.clone(), self.range.1.clone()),
407 Some(Self::BATCH_SIZE),
408 )?
409 } else {
410 self.inner.range(
411 (self.range.0.clone(), self.range.1.clone()),
412 Some(Self::BATCH_SIZE),
413 )?
414 };
415
416 if let Some((last_key, _)) = batch.last() {
417 let full_key = FullKey::new_with_gap_epoch(
418 last_key.user_key.table_id,
419 TableKey(last_key.user_key.table_key.0.clone()),
420 last_key.epoch_with_gap,
421 );
422 if self.rev {
423 self.range.1 = Bound::Excluded(full_key);
424 } else {
425 self.range.0 = Bound::Excluded(full_key);
426 }
427 }
428 self.current = batch.into_iter();
429 Ok(())
430 }
431 }
432
433 impl<R: RangeKv> Iter<R> {
434 #[allow(clippy::type_complexity)]
435 pub fn next(&mut self) -> StorageResult<Option<(BytesFullKey, Option<Bytes>)>> {
436 match self.current.next() {
437 Some((key, value)) => Ok(Some((key, value))),
438 None => {
439 self.refill()?;
440 Ok(self.current.next())
441 }
442 }
443 }
444 }
445
446 #[cfg(test)]
447 mod tests {
448 use rand::Rng;
449
450 use super::*;
451 use crate::memory::sled::SledRangeKv;
452
453 #[test]
454 fn test_btreemap_iter_chaos() {
455 let map = Arc::new(RwLock::new(BTreeMap::new()));
456 test_iter_chaos_inner(map, 1000);
457 }
458
459 #[cfg(not(madsim))]
460 #[test]
461 fn test_sled_iter_chaos() {
462 let map = SledRangeKv::new_temp();
463 test_iter_chaos_inner(map, 100);
464 }
465
466 fn test_iter_chaos_inner(map: impl RangeKv, count: usize) {
467 let key_range = 1..=10000;
468 let num_to_bytes = |k: i32| Bytes::from(format!("{:06}", k).as_bytes().to_vec());
469 let num_to_full_key =
470 |k: i32| FullKey::new(TableId::default(), TableKey(num_to_bytes(k)), 0);
471 #[allow(clippy::mutable_key_type)]
472 map.ingest_batch(key_range.clone().map(|k| {
473 let key = num_to_full_key(k);
474 let b = key.user_key.table_key.0.clone();
475
476 (key, Some(b))
477 }))
478 .unwrap();
479
480 let rand_bound = || {
481 let key = rand::rng().random_range(key_range.clone());
482 let key = num_to_full_key(key);
483 match rand::rng().random_range(1..=5) {
484 1 | 2 => Bound::Included(key),
485 3 | 4 => Bound::Excluded(key),
486 _ => Bound::Unbounded,
487 }
488 };
489
490 for _ in 0..count {
491 let range = loop {
492 let range = (rand_bound(), rand_bound());
493 let (start, end) = (range.start_bound(), range.end_bound());
494
495 match (start, end) {
497 (Bound::Excluded(s), Bound::Excluded(e)) if s == e => {
498 continue;
499 }
500 (
501 Bound::Included(s) | Bound::Excluded(s),
502 Bound::Included(e) | Bound::Excluded(e),
503 ) if s > e => {
504 continue;
505 }
506 _ => break range,
507 }
508 };
509
510 let v1 = {
511 let mut v = vec![];
512 let mut iter = Iter::new(map.clone(), range.clone(), false);
513 while let Some((key, value)) = iter.next().unwrap() {
514 v.push((key, value));
515 }
516 v
517 };
518 let v2 = map.range(range, None).unwrap();
519
520 assert_eq!(v1, v2);
522 }
523 }
524 }
525}
526
527pub type MemoryStateStore = RangeKvStateStore<BTreeMapRangeKv>;
528
529struct TableState {
530 init_epoch: u64,
531 next_epochs: BTreeMap<u64, u64>,
532 latest_sealed_epoch: Option<u64>,
533 sealing_epochs: BTreeMap<u64, BitmapBuilder>,
534}
535
536impl TableState {
537 fn new(init_epoch: u64) -> Self {
538 Self {
539 init_epoch,
540 next_epochs: Default::default(),
541 latest_sealed_epoch: None,
542 sealing_epochs: Default::default(),
543 }
544 }
545
546 async fn wait_epoch(
547 tables: &parking_lot::Mutex<HashMap<TableId, Self>>,
548 table_id: TableId,
549 epoch: u64,
550 ) {
551 loop {
552 {
553 let tables = tables.lock();
554 let table_state = tables.get(&table_id).expect("should exist");
555 assert!(epoch >= table_state.init_epoch);
556 if epoch == table_state.init_epoch {
557 return;
558 }
559 if let Some(latest_sealed_epoch) = table_state.latest_sealed_epoch
560 && latest_sealed_epoch >= epoch
561 {
562 return;
563 }
564 }
565 yield_now().await;
566 }
567 }
568}
569
570type InMemVectorStore = Arc<RwLock<HashMap<TableId, Vec<(Vector, Bytes, u64)>>>>;
571
572#[derive(Clone, Default)]
578pub struct RangeKvStateStore<R: RangeKv> {
579 inner: R,
581 tables: Arc<parking_lot::Mutex<HashMap<TableId, TableState>>>,
583
584 vectors: InMemVectorStore,
585}
586
587fn to_full_key_range<R, B>(table_id: TableId, table_key_range: R) -> BytesFullKeyRange
588where
589 R: RangeBounds<B> + Send,
590 B: AsRef<[u8]>,
591{
592 let start = match table_key_range.start_bound() {
593 Included(k) => Included(FullKey::new(
594 table_id,
595 TableKey(Bytes::from(k.as_ref().to_vec())),
596 HummockEpoch::MAX,
597 )),
598 Excluded(k) => Excluded(FullKey::new(
599 table_id,
600 TableKey(Bytes::from(k.as_ref().to_vec())),
601 0,
602 )),
603 Unbounded => Included(FullKey::new(
604 table_id,
605 TableKey(Bytes::from(b"".to_vec())),
606 HummockEpoch::MAX,
607 )),
608 };
609 let end = match table_key_range.end_bound() {
610 Included(k) => Included(FullKey::new(
611 table_id,
612 TableKey(Bytes::from(k.as_ref().to_vec())),
613 0,
614 )),
615 Excluded(k) => Excluded(FullKey::new(
616 table_id,
617 TableKey(Bytes::from(k.as_ref().to_vec())),
618 HummockEpoch::MAX,
619 )),
620 Unbounded => {
621 if let Some(next_table_id) = table_id.table_id().checked_add(1) {
622 Excluded(FullKey::new(
623 next_table_id.into(),
624 TableKey(Bytes::from(b"".to_vec())),
625 HummockEpoch::MAX,
626 ))
627 } else {
628 Unbounded
629 }
630 }
631 };
632 (start, end)
633}
634
635impl MemoryStateStore {
636 pub fn new() -> Self {
637 Self::default()
638 }
639
640 pub fn shared() -> Self {
641 static STORE: LazyLock<MemoryStateStore> = LazyLock::new(MemoryStateStore::new);
642 STORE.clone()
643 }
644}
645
646impl<R: RangeKv> RangeKvStateStore<R> {
647 fn scan(
648 &self,
649 key_range: TableKeyRange,
650 epoch: u64,
651 table_id: TableId,
652 limit: Option<usize>,
653 ) -> StorageResult<Vec<(Bytes, Bytes)>> {
654 let mut data = vec![];
655 if limit == Some(0) {
656 return Ok(vec![]);
657 }
658 let mut last_user_key = None;
659 for (key, value) in self
660 .inner
661 .range(to_full_key_range(table_id, key_range), None)?
662 {
663 if key.epoch_with_gap.pure_epoch() > epoch {
664 continue;
665 }
666 if Some(&key.user_key) != last_user_key.as_ref() {
667 if let Some(value) = value {
668 data.push((Bytes::from(key.encode()), value.clone()));
669 }
670 last_user_key = Some(key.user_key.clone());
671 }
672 if let Some(limit) = limit
673 && data.len() >= limit
674 {
675 break;
676 }
677 }
678 Ok(data)
679 }
680}
681
682#[derive(Clone)]
683pub struct RangeKvStateStoreReadSnapshot<R: RangeKv> {
684 inner: RangeKvStateStore<R>,
685 epoch: u64,
686 table_id: TableId,
687}
688
689impl<R: RangeKv> StateStoreGet for RangeKvStateStoreReadSnapshot<R> {
690 async fn on_key_value<O: Send + 'static>(
691 &self,
692 key: TableKey<Bytes>,
693 _read_options: ReadOptions,
694 on_key_value_fn: impl KeyValueFn<O>,
695 ) -> StorageResult<Option<O>> {
696 self.inner
697 .get_keyed_row_impl(key, self.epoch, self.table_id)
698 .and_then(|option| {
699 if let Some((key, value)) = option {
700 on_key_value_fn(key.to_ref(), value.as_ref()).map(Some)
701 } else {
702 Ok(None)
703 }
704 })
705 }
706}
707
708impl<R: RangeKv> StateStoreRead for RangeKvStateStoreReadSnapshot<R> {
709 type Iter = RangeKvStateStoreIter<R>;
710 type RevIter = RangeKvStateStoreRevIter<R>;
711
712 async fn iter(
713 &self,
714 key_range: TableKeyRange,
715 _read_options: ReadOptions,
716 ) -> StorageResult<Self::Iter> {
717 self.inner.iter_impl(key_range, self.epoch, self.table_id)
718 }
719
720 async fn rev_iter(
721 &self,
722 key_range: TableKeyRange,
723 _read_options: ReadOptions,
724 ) -> StorageResult<Self::RevIter> {
725 self.inner
726 .rev_iter_impl(key_range, self.epoch, self.table_id)
727 }
728}
729
730impl<R: RangeKv> StateStoreReadVector for RangeKvStateStoreReadSnapshot<R> {
731 async fn nearest<O: Send + 'static>(
732 &self,
733 vec: Vector,
734 options: VectorNearestOptions,
735 on_nearest_item_fn: impl OnNearestItemFn<O>,
736 ) -> StorageResult<Vec<O>> {
737 fn nearest_impl<M: MeasureDistanceBuilder, O>(
738 store: &InMemVectorStore,
739 epoch: u64,
740 table_id: TableId,
741 vec: Vector,
742 options: VectorNearestOptions,
743 on_nearest_item_fn: impl OnNearestItemFn<O>,
744 ) -> Vec<O> {
745 let mut builder = NearestBuilder::<'_, O, M>::new(vec.to_ref(), options.top_n);
746 builder.add(
747 store
748 .read()
749 .get(&table_id)
750 .map(|vec| vec.iter())
751 .into_iter()
752 .flatten()
753 .filter(|(_, _, vector_epoch)| epoch >= *vector_epoch)
754 .map(|(vec, info, _)| (vec.to_ref(), info.as_ref())),
755 on_nearest_item_fn,
756 );
757 builder.finish()
758 }
759 dispatch_distance_measurement!(options.measure, MeasurementType, {
760 Ok(nearest_impl::<MeasurementType, O>(
761 &self.inner.vectors,
762 self.epoch,
763 self.table_id,
764 vec,
765 options,
766 on_nearest_item_fn,
767 ))
768 })
769 }
770}
771
772impl<R: RangeKv> RangeKvStateStore<R> {
773 fn get_keyed_row_impl(
774 &self,
775 key: TableKey<Bytes>,
776 epoch: u64,
777 table_id: TableId,
778 ) -> StorageResult<Option<StateStoreKeyedRow>> {
779 let range_bounds = (Bound::Included(key.clone()), Bound::Included(key));
780 let res = self.scan(range_bounds, epoch, table_id, Some(1))?;
782
783 Ok(match res.as_slice() {
784 [] => None,
785 [(key, value)] => Some((
786 FullKey::decode(key.as_ref()).to_vec().into_bytes(),
787 value.clone(),
788 )),
789 _ => unreachable!(),
790 })
791 }
792
793 fn iter_impl(
794 &self,
795 key_range: TableKeyRange,
796 epoch: u64,
797 table_id: TableId,
798 ) -> StorageResult<RangeKvStateStoreIter<R>> {
799 Ok(RangeKvStateStoreIter::new(
800 batched_iter::Iter::new(
801 self.inner.clone(),
802 to_full_key_range(table_id, key_range),
803 false,
804 ),
805 epoch,
806 true,
807 ))
808 }
809
810 fn rev_iter_impl(
811 &self,
812 key_range: TableKeyRange,
813 epoch: u64,
814 table_id: TableId,
815 ) -> StorageResult<RangeKvStateStoreRevIter<R>> {
816 Ok(RangeKvStateStoreRevIter::new(
817 batched_iter::Iter::new(
818 self.inner.clone(),
819 to_full_key_range(table_id, key_range),
820 true,
821 ),
822 epoch,
823 true,
824 ))
825 }
826}
827
828impl<R: RangeKv> StateStoreReadLog for RangeKvStateStore<R> {
829 type ChangeLogIter = RangeKvStateStoreChangeLogIter<R>;
830
831 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
832 loop {
833 {
834 let tables = self.tables.lock();
835 let Some(tables) = tables.get(&options.table_id) else {
836 return Err(HummockError::next_epoch(format!(
837 "table {} not exist",
838 options.table_id
839 ))
840 .into());
841 };
842 if let Some(next_epoch) = tables.next_epochs.get(&epoch) {
843 break Ok(*next_epoch);
844 }
845 }
846 yield_now().await;
847 }
848 }
849
850 async fn iter_log(
851 &self,
852 (min_epoch, max_epoch): (u64, u64),
853 key_range: TableKeyRange,
854 options: ReadLogOptions,
855 ) -> StorageResult<Self::ChangeLogIter> {
856 let new_value_iter = RangeKvStateStoreIter::new(
857 batched_iter::Iter::new(
858 self.inner.clone(),
859 to_full_key_range(options.table_id, key_range.clone()),
860 false,
861 ),
862 max_epoch,
863 true,
864 );
865 let old_value_iter = RangeKvStateStoreIter::new(
866 batched_iter::Iter::new(
867 self.inner.clone(),
868 to_full_key_range(options.table_id, key_range),
869 false,
870 ),
871 min_epoch,
872 false,
873 );
874 RangeKvStateStoreChangeLogIter::new(new_value_iter, old_value_iter)
875 }
876}
877
878impl<R: RangeKv> RangeKvStateStore<R> {
879 fn new_read_snapshot_impl(
880 &self,
881 epoch: u64,
882 table_id: TableId,
883 ) -> RangeKvStateStoreReadSnapshot<R> {
884 RangeKvStateStoreReadSnapshot {
885 inner: self.clone(),
886 epoch,
887 table_id,
888 }
889 }
890
891 pub(crate) fn ingest_batch(
892 &self,
893 mut kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
894 delete_ranges: Vec<(Bound<Bytes>, Bound<Bytes>)>,
895 epoch: u64,
896 table_id: TableId,
897 ) -> StorageResult<usize> {
898 let mut delete_keys = BTreeSet::new();
899 for del_range in delete_ranges {
900 for (key, _) in self.inner.range(
901 (
902 del_range
903 .0
904 .map(|table_key| FullKey::new(table_id, TableKey(table_key), epoch)),
905 del_range
906 .1
907 .map(|table_key| FullKey::new(table_id, TableKey(table_key), epoch)),
908 ),
909 None,
910 )? {
911 delete_keys.insert(key.user_key.table_key);
912 }
913 }
914 for key in delete_keys {
915 kv_pairs.push((key, StorageValue::new_delete()));
916 }
917
918 let mut size = 0;
919 self.inner
920 .ingest_batch(kv_pairs.into_iter().map(|(key, value)| {
921 size += key.len() + value.size();
922 (FullKey::new(table_id, key, epoch), value.user_value)
923 }))?;
924 Ok(size)
925 }
926
927 fn ingest_vectors(&self, table_id: TableId, epoch: u64, vecs: Vec<(Vector, Bytes)>) {
928 self.vectors
929 .write()
930 .entry(table_id)
931 .or_default()
932 .extend(vecs.into_iter().map(|(vec, info)| (vec, info, epoch)));
933 }
934}
935
936impl<R: RangeKv> StateStore for RangeKvStateStore<R> {
937 type Local = RangeKvLocalStateStore<R>;
938 type ReadSnapshot = RangeKvStateStoreReadSnapshot<R>;
939 type VectorWriter = RangeKvLocalStateStore<R>;
940
941 async fn try_wait_epoch(
942 &self,
943 _epoch: HummockReadEpoch,
944 _options: TryWaitEpochOptions,
945 ) -> StorageResult<()> {
946 Ok(())
948 }
949
950 async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
951 RangeKvLocalStateStore::new(self.clone(), option)
952 }
953
954 async fn new_read_snapshot(
955 &self,
956 epoch: HummockReadEpoch,
957 options: NewReadSnapshotOptions,
958 ) -> StorageResult<Self::ReadSnapshot> {
959 Ok(self.new_read_snapshot_impl(epoch.get_epoch(), options.table_id))
960 }
961
962 async fn new_vector_writer(&self, options: NewVectorWriterOptions) -> Self::VectorWriter {
963 RangeKvLocalStateStore::new(
964 self.clone(),
965 NewLocalOptions {
966 table_id: options.table_id,
967 op_consistency_level: Default::default(),
968 table_option: Default::default(),
969 is_replicated: false,
970 vnodes: Arc::new(Bitmap::from_bool_slice(&[true])),
971 upload_on_flush: true,
972 },
973 )
974 }
975}
976
977pub struct RangeKvLocalStateStore<R: RangeKv> {
978 mem_table: MemTable,
979 vectors: Vec<(Vector, Bytes)>,
980 inner: RangeKvStateStore<R>,
981
982 epoch: Option<EpochPair>,
983
984 table_id: TableId,
985 op_consistency_level: OpConsistencyLevel,
986 table_option: TableOption,
987 vnodes: Arc<Bitmap>,
988}
989
990impl<R: RangeKv> RangeKvLocalStateStore<R> {
991 pub fn new(inner: RangeKvStateStore<R>, option: NewLocalOptions) -> Self {
992 Self {
993 inner,
994 mem_table: MemTable::new(option.table_id, option.op_consistency_level.clone()),
995 epoch: None,
996 table_id: option.table_id,
997 op_consistency_level: option.op_consistency_level,
998 table_option: option.table_option,
999 vnodes: option.vnodes,
1000 vectors: vec![],
1001 }
1002 }
1003
1004 fn epoch(&self) -> u64 {
1005 self.epoch.expect("should have set the epoch").curr
1006 }
1007}
1008
1009impl<R: RangeKv> StateStoreGet for RangeKvLocalStateStore<R> {
1010 async fn on_key_value<O: Send + 'static>(
1011 &self,
1012 key: TableKey<Bytes>,
1013 _read_options: ReadOptions,
1014 on_key_value_fn: impl KeyValueFn<O>,
1015 ) -> StorageResult<Option<O>> {
1016 if let Some((key, value)) = match self.mem_table.buffer.get(&key) {
1017 None => self
1018 .inner
1019 .get_keyed_row_impl(key, self.epoch(), self.table_id)?,
1020 Some(op) => match op {
1021 KeyOp::Insert(value) | KeyOp::Update((_, value)) => Some((
1022 FullKey::new(self.table_id, key, self.epoch()),
1023 value.clone(),
1024 )),
1025 KeyOp::Delete(_) => None,
1026 },
1027 } {
1028 Ok(Some(on_key_value_fn(key.to_ref(), value.as_ref())?))
1029 } else {
1030 Ok(None)
1031 }
1032 }
1033}
1034
1035impl<R: RangeKv> LocalStateStore for RangeKvLocalStateStore<R> {
1036 type FlushedSnapshotReader = RangeKvStateStoreReadSnapshot<R>;
1037
1038 type Iter<'a> = impl StateStoreIter + 'a;
1039 type RevIter<'a> = impl StateStoreIter + 'a;
1040
1041 async fn iter(
1042 &self,
1043 key_range: TableKeyRange,
1044 _read_options: ReadOptions,
1045 ) -> StorageResult<Self::Iter<'_>> {
1046 let iter = self
1047 .inner
1048 .iter_impl(key_range.clone(), self.epoch(), self.table_id)?;
1049 Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream(
1050 self.mem_table.iter(key_range),
1051 iter.into_stream(to_owned_item),
1052 self.table_id,
1053 self.epoch(),
1054 false,
1055 ))))
1056 }
1057
1058 async fn rev_iter(
1059 &self,
1060 key_range: TableKeyRange,
1061 _read_options: ReadOptions,
1062 ) -> StorageResult<Self::RevIter<'_>> {
1063 let iter = self
1064 .inner
1065 .rev_iter_impl(key_range.clone(), self.epoch(), self.table_id)?;
1066 Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream(
1067 self.mem_table.rev_iter(key_range),
1068 iter.into_stream(to_owned_item),
1069 self.table_id,
1070 self.epoch(),
1071 true,
1072 ))))
1073 }
1074
1075 fn insert(
1076 &mut self,
1077 key: TableKey<Bytes>,
1078 new_val: Bytes,
1079 old_val: Option<Bytes>,
1080 ) -> StorageResult<()> {
1081 match old_val {
1082 None => self.mem_table.insert(key, new_val)?,
1083 Some(old_val) => self.mem_table.update(key, old_val, new_val)?,
1084 };
1085 Ok(())
1086 }
1087
1088 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1089 Ok(self.mem_table.delete(key, old_val)?)
1090 }
1091
1092 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1093 if self.vnodes.len() > 1 {
1094 TableState::wait_epoch(
1095 &self.inner.tables,
1096 self.table_id,
1097 self.epoch.expect("should have init").prev,
1098 )
1099 .await;
1100 }
1101 Ok(std::mem::replace(&mut self.vnodes, vnodes))
1102 }
1103
1104 fn get_table_watermark(&self, _vnode: VirtualNode) -> Option<Bytes> {
1105 None
1107 }
1108
1109 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1110 self.inner.new_read_snapshot_impl(MAX_EPOCH, self.table_id)
1111 }
1112}
1113
1114impl<R: RangeKv> StateStoreWriteEpochControl for RangeKvLocalStateStore<R> {
1115 async fn flush(&mut self) -> StorageResult<usize> {
1116 let buffer = self.mem_table.drain().into_parts();
1117 let mut kv_pairs = Vec::with_capacity(buffer.len());
1118 let sanity_check_read_snapshot = if sanity_check_enabled() {
1119 Some(self.inner.new_read_snapshot_impl(MAX_EPOCH, self.table_id))
1120 } else {
1121 None
1122 };
1123 for (key, key_op) in buffer {
1124 match key_op {
1125 KeyOp::Insert(value) => {
1129 if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1130 do_insert_sanity_check(
1131 self.table_id,
1132 &key,
1133 &value,
1134 sanity_check_read_snapshot,
1135 self.table_option,
1136 &self.op_consistency_level,
1137 )
1138 .await?;
1139 }
1140 kv_pairs.push((key, StorageValue::new_put(value)));
1141 }
1142 KeyOp::Delete(old_value) => {
1143 if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1144 do_delete_sanity_check(
1145 self.table_id,
1146 &key,
1147 &old_value,
1148 sanity_check_read_snapshot,
1149 self.table_option,
1150 &self.op_consistency_level,
1151 )
1152 .await?;
1153 }
1154 kv_pairs.push((key, StorageValue::new_delete()));
1155 }
1156 KeyOp::Update((old_value, new_value)) => {
1157 if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1158 do_update_sanity_check(
1159 self.table_id,
1160 &key,
1161 &old_value,
1162 &new_value,
1163 sanity_check_read_snapshot,
1164 self.table_option,
1165 &self.op_consistency_level,
1166 )
1167 .await?;
1168 }
1169 kv_pairs.push((key, StorageValue::new_put(new_value)));
1170 }
1171 }
1172 }
1173 let epoch = self.epoch();
1174 self.inner
1175 .ingest_vectors(self.table_id, epoch, take(&mut self.vectors));
1176 self.inner
1177 .ingest_batch(kv_pairs, vec![], epoch, self.table_id)
1178 }
1179
1180 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1181 assert_eq!(
1182 self.epoch.replace(options.epoch),
1183 None,
1184 "epoch in local state store of table id {:?} is init for more than once",
1185 self.table_id
1186 );
1187 self.inner
1188 .tables
1189 .lock()
1190 .entry(self.table_id)
1191 .or_insert_with(|| TableState::new(options.epoch.prev))
1192 .next_epochs
1193 .insert(options.epoch.prev, options.epoch.curr);
1194 if self.vnodes.len() > 1 {
1195 TableState::wait_epoch(&self.inner.tables, self.table_id, options.epoch.prev).await;
1196 }
1197
1198 Ok(())
1199 }
1200
1201 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1202 assert!(!self.mem_table.is_dirty());
1203 if let Some(value_checker) = opts.switch_op_consistency_level {
1204 self.mem_table.op_consistency_level.update(&value_checker);
1205 }
1206 let epoch = self
1207 .epoch
1208 .as_mut()
1209 .expect("should have init epoch before seal the first epoch");
1210 let prev_epoch = epoch.curr;
1211 epoch.prev = prev_epoch;
1212 epoch.curr = next_epoch;
1213 assert!(
1214 next_epoch > prev_epoch,
1215 "new epoch {} should be greater than current epoch: {}",
1216 next_epoch,
1217 prev_epoch
1218 );
1219
1220 let mut tables = self.inner.tables.lock();
1221 let table_state = tables
1222 .get_mut(&self.table_id)
1223 .expect("should be set when init");
1224
1225 table_state.next_epochs.insert(prev_epoch, next_epoch);
1226 if self.vnodes.len() > 1 {
1227 let sealing_epoch_vnodes = table_state
1228 .sealing_epochs
1229 .entry(prev_epoch)
1230 .or_insert_with(|| BitmapBuilder::zeroed(self.vnodes.len()));
1231 assert_eq!(self.vnodes.len(), sealing_epoch_vnodes.len());
1232 for vnode in self.vnodes.iter_ones() {
1233 assert!(!sealing_epoch_vnodes.is_set(vnode));
1234 sealing_epoch_vnodes.set(vnode, true);
1235 }
1236 if (0..self.vnodes.len()).all(|vnode| sealing_epoch_vnodes.is_set(vnode)) {
1237 let (all_sealed_epoch, _) =
1238 table_state.sealing_epochs.pop_first().expect("non-empty");
1239 assert_eq!(
1240 all_sealed_epoch, prev_epoch,
1241 "new all_sealed_epoch must be the current prev epoch"
1242 );
1243 if let Some(prev_latest_sealed_epoch) =
1244 table_state.latest_sealed_epoch.replace(prev_epoch)
1245 {
1246 assert!(prev_epoch > prev_latest_sealed_epoch);
1247 }
1248 }
1249 }
1250
1251 if let Some((direction, watermarks, _watermark_type)) = opts.table_watermarks {
1252 let delete_ranges = watermarks
1253 .iter()
1254 .flat_map(|vnode_watermark| {
1255 let inner_range = match direction {
1256 WatermarkDirection::Ascending => {
1257 (Unbounded, Excluded(vnode_watermark.watermark().clone()))
1258 }
1259 WatermarkDirection::Descending => {
1260 (Excluded(vnode_watermark.watermark().clone()), Unbounded)
1261 }
1262 };
1263 vnode_watermark
1264 .vnode_bitmap()
1265 .iter_vnodes()
1266 .map(move |vnode| {
1267 let (start, end) =
1268 prefixed_range_with_vnode(inner_range.clone(), vnode);
1269 (start.map(|key| key.0.clone()), end.map(|key| key.0.clone()))
1270 })
1271 })
1272 .collect_vec();
1273 if let Err(e) =
1274 self.inner
1275 .ingest_batch(Vec::new(), delete_ranges, self.epoch(), self.table_id)
1276 {
1277 error!(error = %e.as_report(), "failed to write delete ranges of table watermark");
1278 }
1279 }
1280 }
1281
1282 async fn try_flush(&mut self) -> StorageResult<()> {
1283 Ok(())
1284 }
1285}
1286
1287impl<R: RangeKv> StateStoreWriteVector for RangeKvLocalStateStore<R> {
1288 fn insert(&mut self, vec: Vector, info: Bytes) -> StorageResult<()> {
1289 self.vectors.push((vec, info));
1290 Ok(())
1291 }
1292}
1293
1294pub struct RangeKvStateStoreIter<R: RangeKv> {
1295 inner: batched_iter::Iter<R>,
1296
1297 epoch: HummockEpoch,
1298 is_inclusive_epoch: bool,
1299
1300 last_key: Option<UserKey<Bytes>>,
1301
1302 item_buffer: Option<StateStoreKeyedRow>,
1303}
1304
1305impl<R: RangeKv> RangeKvStateStoreIter<R> {
1306 pub fn new(
1307 inner: batched_iter::Iter<R>,
1308 epoch: HummockEpoch,
1309 is_inclusive_epoch: bool,
1310 ) -> Self {
1311 Self {
1312 inner,
1313 epoch,
1314 is_inclusive_epoch,
1315 last_key: None,
1316 item_buffer: None,
1317 }
1318 }
1319}
1320
1321impl<R: RangeKv> StateStoreIter for RangeKvStateStoreIter<R> {
1322 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
1323 self.next_inner()?;
1324 Ok(self
1325 .item_buffer
1326 .as_ref()
1327 .map(|(key, value)| (key.to_ref(), value.as_ref())))
1328 }
1329}
1330
1331impl<R: RangeKv> RangeKvStateStoreIter<R> {
1332 fn next_inner(&mut self) -> StorageResult<()> {
1333 self.item_buffer = None;
1334 while let Some((key, value)) = self.inner.next()? {
1335 let epoch = key.epoch_with_gap.pure_epoch();
1336 if epoch > self.epoch {
1337 continue;
1338 }
1339 if epoch == self.epoch && !self.is_inclusive_epoch {
1340 continue;
1341 }
1342 if Some(key.user_key.as_ref()) != self.last_key.as_ref().map(|key| key.as_ref()) {
1343 self.last_key = Some(key.user_key.clone());
1344 if let Some(value) = value {
1345 self.item_buffer = Some((key, value));
1346 break;
1347 }
1348 }
1349 }
1350 Ok(())
1351 }
1352}
1353
1354pub struct RangeKvStateStoreRevIter<R: RangeKv> {
1355 inner: batched_iter::Iter<R>,
1356
1357 epoch: HummockEpoch,
1358 is_inclusive_epoch: bool,
1359
1360 item_buffer: VecDeque<StateStoreKeyedRow>,
1361}
1362
1363impl<R: RangeKv> RangeKvStateStoreRevIter<R> {
1364 pub fn new(
1365 inner: batched_iter::Iter<R>,
1366 epoch: HummockEpoch,
1367 is_inclusive_epoch: bool,
1368 ) -> Self {
1369 Self {
1370 inner,
1371 epoch,
1372 is_inclusive_epoch,
1373 item_buffer: VecDeque::default(),
1374 }
1375 }
1376}
1377
1378impl<R: RangeKv> StateStoreIter for RangeKvStateStoreRevIter<R> {
1379 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
1380 self.next_inner()?;
1381 Ok(self
1382 .item_buffer
1383 .back()
1384 .map(|(key, value)| (key.to_ref(), value.as_ref())))
1385 }
1386}
1387
1388impl<R: RangeKv> RangeKvStateStoreRevIter<R> {
1389 fn next_inner(&mut self) -> StorageResult<()> {
1390 self.item_buffer.pop_back();
1391 while let Some((key, value)) = self.inner.next()? {
1392 let epoch = key.epoch_with_gap.pure_epoch();
1393 if epoch > self.epoch {
1394 continue;
1395 }
1396 if epoch == self.epoch && !self.is_inclusive_epoch {
1397 continue;
1398 }
1399
1400 let v = match value {
1401 Some(v) => v,
1402 None => {
1403 if let Some(last_key) = self.item_buffer.front()
1404 && key.user_key.as_ref() == last_key.0.user_key.as_ref()
1405 {
1406 self.item_buffer.clear();
1407 }
1408 continue;
1409 }
1410 };
1411
1412 if let Some(last_key) = self.item_buffer.front() {
1413 if key.user_key.as_ref() != last_key.0.user_key.as_ref() {
1414 self.item_buffer.push_front((key, v));
1415 break;
1416 } else {
1417 self.item_buffer.pop_front();
1418 self.item_buffer.push_front((key, v));
1419 }
1420 } else {
1421 self.item_buffer.push_front((key, v));
1422 }
1423 }
1424 Ok(())
1425 }
1426}
1427
1428pub struct RangeKvStateStoreChangeLogIter<R: RangeKv> {
1429 new_value_iter: RangeKvStateStoreIter<R>,
1430 old_value_iter: RangeKvStateStoreIter<R>,
1431 item_buffer: Option<(TableKey<Bytes>, ChangeLogValue<Bytes>)>,
1432}
1433
1434impl<R: RangeKv> RangeKvStateStoreChangeLogIter<R> {
1435 fn new(
1436 mut new_value_iter: RangeKvStateStoreIter<R>,
1437 mut old_value_iter: RangeKvStateStoreIter<R>,
1438 ) -> StorageResult<Self> {
1439 new_value_iter.next_inner()?;
1440 old_value_iter.next_inner()?;
1441 Ok(Self {
1442 new_value_iter,
1443 old_value_iter,
1444 item_buffer: None,
1445 })
1446 }
1447}
1448
1449impl<R: RangeKv> StateStoreIter<StateStoreReadLogItem> for RangeKvStateStoreChangeLogIter<R> {
1450 async fn try_next(&mut self) -> StorageResult<Option<StateStoreReadLogItemRef<'_>>> {
1451 loop {
1452 match (
1453 &self.new_value_iter.item_buffer,
1454 &self.old_value_iter.item_buffer,
1455 ) {
1456 (None, None) => {
1457 self.item_buffer = None;
1458 break;
1459 }
1460 (Some((key, new_value)), None) => {
1461 self.item_buffer = Some((
1462 key.user_key.table_key.clone(),
1463 ChangeLogValue::Insert(new_value.clone()),
1464 ));
1465 self.new_value_iter.next_inner()?;
1466 }
1467 (None, Some((key, old_value))) => {
1468 self.item_buffer = Some((
1469 key.user_key.table_key.clone(),
1470 ChangeLogValue::Delete(old_value.clone()),
1471 ));
1472 self.old_value_iter.next_inner()?;
1473 }
1474 (Some((new_value_key, new_value)), Some((old_value_key, old_value))) => {
1475 match new_value_key.user_key.cmp(&old_value_key.user_key) {
1476 Ordering::Less => {
1477 self.item_buffer = Some((
1478 new_value_key.user_key.table_key.clone(),
1479 ChangeLogValue::Insert(new_value.clone()),
1480 ));
1481 self.new_value_iter.next_inner()?;
1482 }
1483 Ordering::Greater => {
1484 self.item_buffer = Some((
1485 old_value_key.user_key.table_key.clone(),
1486 ChangeLogValue::Delete(old_value.clone()),
1487 ));
1488 self.old_value_iter.next_inner()?;
1489 }
1490 Ordering::Equal => {
1491 if new_value == old_value {
1492 self.new_value_iter.next_inner()?;
1493 self.old_value_iter.next_inner()?;
1494 continue;
1495 }
1496 self.item_buffer = Some((
1497 new_value_key.user_key.table_key.clone(),
1498 ChangeLogValue::Update {
1499 new_value: new_value.clone(),
1500 old_value: old_value.clone(),
1501 },
1502 ));
1503 self.new_value_iter.next_inner()?;
1504 self.old_value_iter.next_inner()?;
1505 }
1506 }
1507 }
1508 };
1509 break;
1510 }
1511 Ok(self
1512 .item_buffer
1513 .as_ref()
1514 .map(|(key, value)| (key.to_ref(), value.to_ref())))
1515 }
1516}
1517
1518#[cfg(test)]
1519mod tests {
1520 use risingwave_common::util::epoch::test_epoch;
1521
1522 use super::*;
1523 use crate::hummock::iterator::test_utils::{
1524 iterator_test_table_key_of, iterator_test_value_of,
1525 };
1526 use crate::hummock::test_utils::{ReadOptions, *};
1527 use crate::memory::sled::SledStateStore;
1528
1529 #[tokio::test]
1530 async fn test_snapshot_isolation_memory() {
1531 let state_store = MemoryStateStore::new();
1532 test_snapshot_isolation_inner(state_store).await;
1533 }
1534
1535 #[cfg(not(madsim))]
1536 #[tokio::test]
1537 async fn test_snapshot_isolation_sled() {
1538 let state_store = SledStateStore::new_temp();
1539 test_snapshot_isolation_inner(state_store).await;
1540 }
1541
1542 async fn test_snapshot_isolation_inner(state_store: RangeKvStateStore<impl RangeKv>) {
1543 state_store
1544 .ingest_batch(
1545 vec![
1546 (
1547 TableKey(Bytes::from(b"a".to_vec())),
1548 StorageValue::new_put(b"v1".to_vec()),
1549 ),
1550 (
1551 TableKey(Bytes::from(b"b".to_vec())),
1552 StorageValue::new_put(b"v1".to_vec()),
1553 ),
1554 ],
1555 vec![],
1556 0,
1557 Default::default(),
1558 )
1559 .unwrap();
1560 state_store
1561 .ingest_batch(
1562 vec![
1563 (
1564 TableKey(Bytes::from(b"a".to_vec())),
1565 StorageValue::new_put(b"v2".to_vec()),
1566 ),
1567 (
1568 TableKey(Bytes::from(b"b".to_vec())),
1569 StorageValue::new_delete(),
1570 ),
1571 ],
1572 vec![],
1573 test_epoch(1),
1574 Default::default(),
1575 )
1576 .unwrap();
1577 assert_eq!(
1578 state_store
1579 .scan(
1580 (
1581 Bound::Included(TableKey(Bytes::from("a"))),
1582 Bound::Included(TableKey(Bytes::from("b"))),
1583 ),
1584 0,
1585 TableId::default(),
1586 None,
1587 )
1588 .unwrap(),
1589 vec![
1590 (
1591 FullKey::for_test(Default::default(), Bytes::from("a"), 0)
1592 .encode()
1593 .into(),
1594 b"v1".to_vec().into()
1595 ),
1596 (
1597 FullKey::for_test(Default::default(), Bytes::from("b"), 0)
1598 .encode()
1599 .into(),
1600 b"v1".to_vec().into()
1601 )
1602 ]
1603 );
1604 assert_eq!(
1605 state_store
1606 .scan(
1607 (
1608 Bound::Included(TableKey(Bytes::from("a"))),
1609 Bound::Included(TableKey(Bytes::from("b"))),
1610 ),
1611 0,
1612 TableId::default(),
1613 Some(1),
1614 )
1615 .unwrap(),
1616 vec![(
1617 FullKey::for_test(Default::default(), b"a".to_vec(), 0)
1618 .encode()
1619 .into(),
1620 b"v1".to_vec().into()
1621 )]
1622 );
1623 assert_eq!(
1624 state_store
1625 .scan(
1626 (
1627 Bound::Included(TableKey(Bytes::from("a"))),
1628 Bound::Included(TableKey(Bytes::from("b"))),
1629 ),
1630 test_epoch(1),
1631 TableId::default(),
1632 None,
1633 )
1634 .unwrap(),
1635 vec![(
1636 FullKey::for_test(Default::default(), b"a".to_vec(), test_epoch(1))
1637 .encode()
1638 .into(),
1639 b"v2".to_vec().into()
1640 )]
1641 );
1642 assert_eq!(
1643 state_store
1644 .get(TableKey(Bytes::from("a")), 0, ReadOptions::default())
1645 .await
1646 .unwrap(),
1647 Some(Bytes::from("v1"))
1648 );
1649 assert_eq!(
1650 state_store
1651 .get(
1652 TableKey(Bytes::copy_from_slice(b"b")),
1653 0,
1654 ReadOptions::default(),
1655 )
1656 .await
1657 .unwrap(),
1658 Some(b"v1".to_vec().into())
1659 );
1660 assert_eq!(
1661 state_store
1662 .get(
1663 TableKey(Bytes::copy_from_slice(b"c")),
1664 0,
1665 ReadOptions::default(),
1666 )
1667 .await
1668 .unwrap(),
1669 None
1670 );
1671 assert_eq!(
1672 state_store
1673 .get(
1674 TableKey(Bytes::copy_from_slice(b"a")),
1675 test_epoch(1),
1676 ReadOptions::default(),
1677 )
1678 .await
1679 .unwrap(),
1680 Some(b"v2".to_vec().into())
1681 );
1682 assert_eq!(
1683 state_store
1684 .get(
1685 TableKey(Bytes::from("b")),
1686 test_epoch(1),
1687 ReadOptions::default(),
1688 )
1689 .await
1690 .unwrap(),
1691 None
1692 );
1693 assert_eq!(
1694 state_store
1695 .get(
1696 TableKey(Bytes::from("c")),
1697 test_epoch(1),
1698 ReadOptions::default()
1699 )
1700 .await
1701 .unwrap(),
1702 None
1703 );
1704 }
1705
1706 #[tokio::test]
1707 async fn test_iter_log_memory() {
1708 let state_store = MemoryStateStore::new();
1709 test_iter_log_inner(state_store).await;
1710 }
1711
1712 #[cfg(not(madsim))]
1713 #[tokio::test]
1714 async fn test_iter_log_sled() {
1715 let state_store = SledStateStore::new_temp();
1716 test_iter_log_inner(state_store).await;
1717 }
1718
1719 async fn test_iter_log_inner(state_store: RangeKvStateStore<impl RangeKv>) {
1720 let table_id = TableId::new(233);
1721 let epoch1 = test_epoch(1);
1722 let key_idx = [1, 2, 4];
1723 let make_key = |i| TableKey(Bytes::from(iterator_test_table_key_of(i)));
1724 let make_value = |i| Bytes::from(iterator_test_value_of(i));
1725 state_store
1726 .ingest_batch(
1727 key_idx
1728 .iter()
1729 .map(|i| (make_key(*i), StorageValue::new_put(make_value(*i))))
1730 .collect(),
1731 vec![],
1732 epoch1,
1733 table_id,
1734 )
1735 .unwrap();
1736 {
1737 let mut iter = state_store
1738 .iter_log(
1739 (epoch1, epoch1),
1740 (Unbounded, Unbounded),
1741 ReadLogOptions { table_id },
1742 )
1743 .await
1744 .unwrap();
1745 for i in key_idx {
1746 let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1747 assert_eq!(make_key(i).to_ref(), iter_key);
1748 assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1749 }
1750 assert!(iter.try_next().await.unwrap().is_none());
1751 }
1752
1753 let epoch2 = test_epoch(2);
1754 state_store
1755 .ingest_batch(
1756 vec![
1757 (make_key(1), StorageValue::new_put(make_value(12))), (make_key(2), StorageValue::new_delete()), (make_key(3), StorageValue::new_put(make_value(3))),
1760 ],
1761 vec![],
1762 epoch2,
1763 table_id,
1764 )
1765 .unwrap();
1766
1767 {
1769 let expected = vec![
1770 (
1771 make_key(1),
1772 ChangeLogValue::Update {
1773 new_value: make_value(12),
1774 old_value: make_value(1),
1775 },
1776 ),
1777 (make_key(2), ChangeLogValue::Delete(make_value(2))),
1778 (make_key(3), ChangeLogValue::Insert(make_value(3))),
1779 ];
1780 let mut iter = state_store
1781 .iter_log(
1782 (epoch2, epoch2),
1783 (Unbounded, Unbounded),
1784 ReadLogOptions { table_id },
1785 )
1786 .await
1787 .unwrap();
1788 for (key, change_log_value) in expected {
1789 let (iter_key, iter_value) = iter.try_next().await.unwrap().unwrap();
1790 assert_eq!(
1791 key.to_ref(),
1792 iter_key,
1793 "{:?} {:?}",
1794 change_log_value.to_ref(),
1795 iter_value
1796 );
1797 assert_eq!(change_log_value.to_ref(), iter_value);
1798 }
1799 assert!(iter.try_next().await.unwrap().is_none());
1800 }
1801 {
1803 let mut iter = state_store
1804 .iter_log(
1805 (epoch1, epoch1),
1806 (Unbounded, Unbounded),
1807 ReadLogOptions { table_id },
1808 )
1809 .await
1810 .unwrap();
1811 for i in key_idx {
1812 let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1813 assert_eq!(make_key(i).to_ref(), iter_key);
1814 assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1815 }
1816 assert!(iter.try_next().await.unwrap().is_none());
1817 }
1818 {
1820 let mut iter = state_store
1821 .iter_log(
1822 (epoch1, epoch2),
1823 (Unbounded, Unbounded),
1824 ReadLogOptions { table_id },
1825 )
1826 .await
1827 .unwrap();
1828 let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1829 assert_eq!(make_key(1).to_ref(), iter_key);
1830 assert_eq!(
1831 change_value,
1832 ChangeLogValue::Insert(make_value(12).as_ref())
1833 );
1834 for i in [3, 4] {
1835 let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1836 assert_eq!(make_key(i).to_ref(), iter_key);
1837 assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1838 }
1839 assert!(iter.try_next().await.unwrap().is_none());
1840 }
1841 }
1842}