1use std::cmp::Ordering;
16use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
17use std::mem::take;
18use std::ops::Bound::{Excluded, Included, Unbounded};
19use std::ops::{Bound, RangeBounds};
20use std::sync::{Arc, LazyLock};
21
22use bytes::Bytes;
23use itertools::Itertools;
24use parking_lot::RwLock;
25use risingwave_common::array::VectorRef;
26use risingwave_common::bitmap::{Bitmap, BitmapBuilder};
27use risingwave_common::catalog::TableId;
28use risingwave_common::dispatch_distance_measurement;
29use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
30use risingwave_common::id::FragmentId;
31use risingwave_common::types::ScalarRef;
32use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH};
33use risingwave_hummock_sdk::key::{
34 FullKey, TableKey, TableKeyRange, UserKey, prefixed_range_with_vnode,
35};
36use risingwave_hummock_sdk::table_watermark::WatermarkDirection;
37use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
38use thiserror_ext::AsReport;
39use tokio::task::yield_now;
40use tracing::error;
41
42use crate::error::StorageResult;
43use crate::hummock::HummockError;
44use crate::hummock::utils::{
45 do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, merge_stream,
46 sanity_check_enabled,
47};
48use crate::mem_table::{KeyOp, MemTable};
49use crate::storage_value::StorageValue;
50use crate::store::*;
51use crate::vector::{MeasureDistanceBuilder, NearestBuilder};
52
53pub type BytesFullKey = FullKey<Bytes>;
54pub type BytesFullKeyRange = (Bound<BytesFullKey>, Bound<BytesFullKey>);
55
56#[allow(clippy::type_complexity)]
57pub trait RangeKv: Clone + Send + Sync + 'static {
58 fn range(
59 &self,
60 range: BytesFullKeyRange,
61 limit: Option<usize>,
62 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>>;
63
64 fn rev_range(
65 &self,
66 range: BytesFullKeyRange,
67 limit: Option<usize>,
68 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>>;
69
70 fn ingest_batch(
71 &self,
72 kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
73 ) -> StorageResult<()>;
74
75 fn flush(&self) -> StorageResult<()>;
76}
77
78pub type BTreeMapRangeKv = Arc<RwLock<BTreeMap<BytesFullKey, Option<Bytes>>>>;
79
80impl RangeKv for BTreeMapRangeKv {
81 fn range(
82 &self,
83 range: BytesFullKeyRange,
84 limit: Option<usize>,
85 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
86 let limit = limit.unwrap_or(usize::MAX);
87 Ok(self
88 .read()
89 .range(range)
90 .take(limit)
91 .map(|(key, value)| (key.clone(), value.clone()))
92 .collect())
93 }
94
95 fn rev_range(
96 &self,
97 range: BytesFullKeyRange,
98 limit: Option<usize>,
99 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
100 let limit = limit.unwrap_or(usize::MAX);
101 Ok(self
102 .read()
103 .range(range)
104 .rev()
105 .take(limit)
106 .map(|(key, value)| (key.clone(), value.clone()))
107 .collect())
108 }
109
110 fn ingest_batch(
111 &self,
112 kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
113 ) -> StorageResult<()> {
114 let mut inner = self.write();
115 for (key, value) in kv_pairs {
116 inner.insert(key, value);
117 }
118 Ok(())
119 }
120
121 fn flush(&self) -> StorageResult<()> {
122 Ok(())
123 }
124}
125
126pub mod sled {
127 use std::fs::create_dir_all;
128 use std::ops::RangeBounds;
129
130 use bytes::Bytes;
131 use risingwave_hummock_sdk::key::FullKey;
132
133 use crate::error::StorageResult;
134 use crate::memory::{BytesFullKey, BytesFullKeyRange, RangeKv, RangeKvStateStore};
135
136 #[derive(Clone)]
137 pub struct SledRangeKv {
138 inner: sled::Db,
139 }
140
141 impl SledRangeKv {
142 pub fn new(path: impl AsRef<std::path::Path>) -> Self {
143 SledRangeKv {
144 inner: sled::open(path).expect("open"),
145 }
146 }
147
148 pub fn new_temp() -> Self {
149 create_dir_all("./.risingwave/sled").expect("should create");
150 let path = tempfile::TempDir::new_in("./.risingwave/sled")
151 .expect("find temp dir")
152 .keep();
153 Self::new(path)
154 }
155 }
156
157 const EMPTY: u8 = 1;
158 const NON_EMPTY: u8 = 0;
159
160 impl RangeKv for SledRangeKv {
161 fn range(
162 &self,
163 range: BytesFullKeyRange,
164 limit: Option<usize>,
165 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
166 let (left, right) = range;
167 let full_key_ref_bound = (
168 left.as_ref().map(FullKey::to_ref),
169 right.as_ref().map(FullKey::to_ref),
170 );
171 let left_encoded = left.as_ref().map(|key| key.to_ref().encode_reverse_epoch());
172 let right_encoded = right
173 .as_ref()
174 .map(|key| key.to_ref().encode_reverse_epoch());
175 let limit = limit.unwrap_or(usize::MAX);
176 let mut ret = vec![];
177 for result in self.inner.range((left_encoded, right_encoded)).take(limit) {
178 let (key, value) = result?;
179 let full_key = FullKey::decode_reverse_epoch(key.as_ref()).copy_into();
180 if !full_key_ref_bound.contains(&full_key.to_ref()) {
181 continue;
182 }
183 let value = match value.as_ref() {
184 [EMPTY] => None,
185 [NON_EMPTY, rest @ ..] => Some(Bytes::from(Vec::from(rest))),
186 _ => unreachable!("malformed value: {:?}", value),
187 };
188 ret.push((full_key, value))
189 }
190 Ok(ret)
191 }
192
193 fn rev_range(
194 &self,
195 range: BytesFullKeyRange,
196 limit: Option<usize>,
197 ) -> StorageResult<Vec<(BytesFullKey, Option<Bytes>)>> {
198 let (left, right) = range;
199 let full_key_ref_bound = (
200 left.as_ref().map(FullKey::to_ref),
201 right.as_ref().map(FullKey::to_ref),
202 );
203 let left_encoded = left.as_ref().map(|key| key.to_ref().encode_reverse_epoch());
204 let right_encoded = right
205 .as_ref()
206 .map(|key| key.to_ref().encode_reverse_epoch());
207 let limit = limit.unwrap_or(usize::MAX);
208 let mut ret = vec![];
209 for result in self
210 .inner
211 .range((left_encoded, right_encoded))
212 .rev()
213 .take(limit)
214 {
215 let (key, value) = result?;
216 let full_key = FullKey::decode_reverse_epoch(key.as_ref()).copy_into();
217 if !full_key_ref_bound.contains(&full_key.to_ref()) {
218 continue;
219 }
220 let value = match value.as_ref() {
221 [EMPTY] => None,
222 [NON_EMPTY, rest @ ..] => Some(Bytes::from(Vec::from(rest))),
223 _ => unreachable!("malformed value: {:?}", value),
224 };
225 ret.push((full_key, value))
226 }
227 Ok(ret)
228 }
229
230 fn ingest_batch(
231 &self,
232 kv_pairs: impl Iterator<Item = (BytesFullKey, Option<Bytes>)>,
233 ) -> StorageResult<()> {
234 let mut batch = sled::Batch::default();
235 for (key, value) in kv_pairs {
236 let encoded_key = key.encode_reverse_epoch();
237 let key = sled::IVec::from(encoded_key);
238 let mut buffer =
239 Vec::with_capacity(value.as_ref().map(|v| v.len()).unwrap_or_default() + 1);
240 if let Some(value) = value {
241 buffer.push(NON_EMPTY);
242 buffer.extend_from_slice(value.as_ref());
243 } else {
244 buffer.push(EMPTY);
245 }
246 let value = sled::IVec::from(buffer);
247 batch.insert(key, value);
248 }
249 self.inner.apply_batch(batch)?;
250 Ok(())
251 }
252
253 fn flush(&self) -> StorageResult<()> {
254 Ok(self.inner.flush().map(|_| {})?)
255 }
256 }
257
258 pub type SledStateStore = RangeKvStateStore<SledRangeKv>;
259
260 impl SledStateStore {
261 pub fn new(path: impl AsRef<std::path::Path>) -> Self {
262 RangeKvStateStore {
263 inner: SledRangeKv::new(path),
264 tables: Default::default(),
265 vectors: Default::default(),
266 }
267 }
268
269 pub fn new_temp() -> Self {
270 RangeKvStateStore {
271 inner: SledRangeKv::new_temp(),
272 tables: Default::default(),
273 vectors: Default::default(),
274 }
275 }
276 }
277
278 #[cfg(test)]
279 mod test {
280 use std::ops::{Bound, RangeBounds};
281
282 use bytes::Bytes;
283 use risingwave_common::catalog::TableId;
284 use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK;
285 use risingwave_hummock_sdk::EpochWithGap;
286 use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
287
288 use crate::memory::RangeKv;
289 use crate::memory::sled::SledRangeKv;
290
291 #[test]
292 fn test_filter_variable_key_length_false_positive() {
293 let table_id = TableId::new(233);
294 let epoch = u64::MAX - u64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]);
295 let excluded_short_table_key = [0, 1, 0, 0];
296 let included_long_table_key = [0, 1, 0, 0, 1, 2];
297 let left_table_key = [0, 1, 0, 0, 1];
298 let right_table_key = [0, 1, 1, 1];
299
300 let to_full_key = |table_key: &[u8]| FullKey {
301 user_key: UserKey {
302 table_id,
303 table_key: TableKey(Bytes::from(table_key.to_vec())),
304 },
305 epoch_with_gap: EpochWithGap::new_from_epoch(epoch & !EPOCH_SPILL_TIME_MASK),
306 };
307
308 let left_full_key = to_full_key(&left_table_key[..]);
309 let right_full_key = to_full_key(&right_table_key[..]);
310 let included_long_full_key = to_full_key(&included_long_table_key[..]);
311 let excluded_short_full_key = to_full_key(&excluded_short_table_key[..]);
312
313 assert!(
314 (
315 Bound::Included(left_full_key.to_ref()),
316 Bound::Included(right_full_key.to_ref())
317 )
318 .contains(&included_long_full_key.to_ref())
319 );
320 assert!(
321 !(
322 Bound::Included(left_full_key.to_ref()),
323 Bound::Included(right_full_key.to_ref())
324 )
325 .contains(&excluded_short_full_key.to_ref())
326 );
327
328 let left_encoded = left_full_key.encode_reverse_epoch();
329 let right_encoded = right_full_key.encode_reverse_epoch();
330
331 assert!(
332 (
333 Bound::Included(left_encoded.clone()),
334 Bound::Included(right_encoded.clone())
335 )
336 .contains(&included_long_full_key.encode_reverse_epoch())
337 );
338 assert!(
339 (
340 Bound::Included(left_encoded),
341 Bound::Included(right_encoded)
342 )
343 .contains(&excluded_short_full_key.encode_reverse_epoch())
344 );
345
346 let sled_range_kv = SledRangeKv::new_temp();
347 sled_range_kv
348 .ingest_batch(
349 vec![
350 (included_long_full_key.clone(), None),
351 (excluded_short_full_key, None),
352 ]
353 .into_iter(),
354 )
355 .unwrap();
356 let kvs = sled_range_kv
357 .range(
358 (
359 Bound::Included(left_full_key),
360 Bound::Included(right_full_key),
361 ),
362 None,
363 )
364 .unwrap();
365 assert_eq!(1, kvs.len());
366 assert_eq!(included_long_full_key.to_ref(), kvs[0].0.to_ref());
367 assert!(kvs[0].1.is_none());
368 }
369 }
370}
371
372mod batched_iter {
373
374 use super::*;
375
376 pub struct Iter<R: RangeKv> {
383 inner: R,
384 range: BytesFullKeyRange,
385 current: std::vec::IntoIter<(FullKey<Bytes>, Option<Bytes>)>,
386 rev: bool,
387 }
388
389 impl<R: RangeKv> Iter<R> {
390 pub fn new(inner: R, range: BytesFullKeyRange, rev: bool) -> Self {
391 Self {
392 inner,
393 range,
394 rev,
395 current: Vec::new().into_iter(),
396 }
397 }
398 }
399
400 impl<R: RangeKv> Iter<R> {
401 const BATCH_SIZE: usize = 256;
402
403 fn refill(&mut self) -> StorageResult<()> {
405 assert!(self.current.is_empty());
406
407 let batch = if self.rev {
408 self.inner.rev_range(
409 (self.range.0.clone(), self.range.1.clone()),
410 Some(Self::BATCH_SIZE),
411 )?
412 } else {
413 self.inner.range(
414 (self.range.0.clone(), self.range.1.clone()),
415 Some(Self::BATCH_SIZE),
416 )?
417 };
418
419 if let Some((last_key, _)) = batch.last() {
420 let full_key = FullKey::new_with_gap_epoch(
421 last_key.user_key.table_id,
422 TableKey(last_key.user_key.table_key.0.clone()),
423 last_key.epoch_with_gap,
424 );
425 if self.rev {
426 self.range.1 = Bound::Excluded(full_key);
427 } else {
428 self.range.0 = Bound::Excluded(full_key);
429 }
430 }
431 self.current = batch.into_iter();
432 Ok(())
433 }
434 }
435
436 impl<R: RangeKv> Iter<R> {
437 #[allow(clippy::type_complexity)]
438 pub fn next(&mut self) -> StorageResult<Option<(BytesFullKey, Option<Bytes>)>> {
439 match self.current.next() {
440 Some((key, value)) => Ok(Some((key, value))),
441 None => {
442 self.refill()?;
443 Ok(self.current.next())
444 }
445 }
446 }
447 }
448
449 #[cfg(test)]
450 mod tests {
451 use rand::Rng;
452
453 use super::*;
454 use crate::memory::sled::SledRangeKv;
455
456 #[test]
457 fn test_btreemap_iter_chaos() {
458 let map = Arc::new(RwLock::new(BTreeMap::new()));
459 test_iter_chaos_inner(map, 1000);
460 }
461
462 #[cfg(not(madsim))]
463 #[test]
464 fn test_sled_iter_chaos() {
465 let map = SledRangeKv::new_temp();
466 test_iter_chaos_inner(map, 100);
467 }
468
469 fn test_iter_chaos_inner(map: impl RangeKv, count: usize) {
470 let key_range = 1..=10000;
471 let num_to_bytes = |k: i32| Bytes::from(format!("{:06}", k).as_bytes().to_vec());
472 let num_to_full_key =
473 |k: i32| FullKey::new(TableId::default(), TableKey(num_to_bytes(k)), 0);
474 #[allow(clippy::mutable_key_type)]
475 map.ingest_batch(key_range.clone().map(|k| {
476 let key = num_to_full_key(k);
477 let b = key.user_key.table_key.0.clone();
478
479 (key, Some(b))
480 }))
481 .unwrap();
482
483 let rand_bound = || {
484 let key = rand::rng().random_range(key_range.clone());
485 let key = num_to_full_key(key);
486 match rand::rng().random_range(1..=5) {
487 1 | 2 => Bound::Included(key),
488 3 | 4 => Bound::Excluded(key),
489 _ => Bound::Unbounded,
490 }
491 };
492
493 for _ in 0..count {
494 let range = loop {
495 let range = (rand_bound(), rand_bound());
496 let (start, end) = (range.start_bound(), range.end_bound());
497
498 match (start, end) {
500 (Bound::Excluded(s), Bound::Excluded(e)) if s == e => {
501 continue;
502 }
503 (
504 Bound::Included(s) | Bound::Excluded(s),
505 Bound::Included(e) | Bound::Excluded(e),
506 ) if s > e => {
507 continue;
508 }
509 _ => break range,
510 }
511 };
512
513 let v1 = {
514 let mut v = vec![];
515 let mut iter = Iter::new(map.clone(), range.clone(), false);
516 while let Some((key, value)) = iter.next().unwrap() {
517 v.push((key, value));
518 }
519 v
520 };
521 let v2 = map.range(range, None).unwrap();
522
523 assert_eq!(v1, v2);
525 }
526 }
527 }
528}
529
530pub type MemoryStateStore = RangeKvStateStore<BTreeMapRangeKv>;
531
532struct TableState {
533 init_epoch: u64,
534 next_epochs: BTreeMap<u64, u64>,
535 latest_sealed_epoch: Option<u64>,
536 sealing_epochs: BTreeMap<u64, BitmapBuilder>,
537}
538
539impl TableState {
540 fn new(init_epoch: u64) -> Self {
541 Self {
542 init_epoch,
543 next_epochs: Default::default(),
544 latest_sealed_epoch: None,
545 sealing_epochs: Default::default(),
546 }
547 }
548
549 async fn wait_epoch(
550 tables: &parking_lot::Mutex<HashMap<TableId, Self>>,
551 table_id: TableId,
552 epoch: u64,
553 ) {
554 loop {
555 {
556 let tables = tables.lock();
557 let table_state = tables.get(&table_id).expect("should exist");
558 assert!(epoch >= table_state.init_epoch);
559 if epoch == table_state.init_epoch {
560 return;
561 }
562 if let Some(latest_sealed_epoch) = table_state.latest_sealed_epoch
563 && latest_sealed_epoch >= epoch
564 {
565 return;
566 }
567 }
568 yield_now().await;
569 }
570 }
571}
572
573type InMemVectorStore = Arc<RwLock<HashMap<TableId, Vec<(Vector, Bytes, u64)>>>>;
574
575#[derive(Clone, Default)]
581pub struct RangeKvStateStore<R: RangeKv> {
582 inner: R,
584 tables: Arc<parking_lot::Mutex<HashMap<TableId, TableState>>>,
586
587 vectors: InMemVectorStore,
588}
589
590fn to_full_key_range<R, B>(table_id: TableId, table_key_range: R) -> BytesFullKeyRange
591where
592 R: RangeBounds<B> + Send,
593 B: AsRef<[u8]>,
594{
595 let start = match table_key_range.start_bound() {
596 Included(k) => Included(FullKey::new(
597 table_id,
598 TableKey(Bytes::from(k.as_ref().to_vec())),
599 HummockEpoch::MAX,
600 )),
601 Excluded(k) => Excluded(FullKey::new(
602 table_id,
603 TableKey(Bytes::from(k.as_ref().to_vec())),
604 0,
605 )),
606 Unbounded => Included(FullKey::new(
607 table_id,
608 TableKey(Bytes::from(b"".to_vec())),
609 HummockEpoch::MAX,
610 )),
611 };
612 let end = match table_key_range.end_bound() {
613 Included(k) => Included(FullKey::new(
614 table_id,
615 TableKey(Bytes::from(k.as_ref().to_vec())),
616 0,
617 )),
618 Excluded(k) => Excluded(FullKey::new(
619 table_id,
620 TableKey(Bytes::from(k.as_ref().to_vec())),
621 HummockEpoch::MAX,
622 )),
623 Unbounded => {
624 if let Some(next_table_id) = table_id.as_raw_id().checked_add(1) {
625 Excluded(FullKey::new(
626 next_table_id.into(),
627 TableKey(Bytes::from(b"".to_vec())),
628 HummockEpoch::MAX,
629 ))
630 } else {
631 Unbounded
632 }
633 }
634 };
635 (start, end)
636}
637
638impl MemoryStateStore {
639 pub fn new() -> Self {
640 Self::default()
641 }
642
643 pub fn shared() -> Self {
644 static STORE: LazyLock<MemoryStateStore> = LazyLock::new(MemoryStateStore::new);
645 STORE.clone()
646 }
647}
648
649impl<R: RangeKv> RangeKvStateStore<R> {
650 fn scan(
651 &self,
652 key_range: TableKeyRange,
653 epoch: u64,
654 table_id: TableId,
655 limit: Option<usize>,
656 ) -> StorageResult<Vec<(Bytes, Bytes)>> {
657 let mut data = vec![];
658 if limit == Some(0) {
659 return Ok(vec![]);
660 }
661 let mut last_user_key = None;
662 for (key, value) in self
663 .inner
664 .range(to_full_key_range(table_id, key_range), None)?
665 {
666 if key.epoch_with_gap.pure_epoch() > epoch {
667 continue;
668 }
669 if Some(&key.user_key) != last_user_key.as_ref() {
670 if let Some(value) = value {
671 data.push((Bytes::from(key.encode()), value.clone()));
672 }
673 last_user_key = Some(key.user_key.clone());
674 }
675 if let Some(limit) = limit
676 && data.len() >= limit
677 {
678 break;
679 }
680 }
681 Ok(data)
682 }
683}
684
685#[derive(Clone)]
686pub struct RangeKvStateStoreReadSnapshot<R: RangeKv> {
687 inner: RangeKvStateStore<R>,
688 epoch: u64,
689 table_id: TableId,
690}
691
692impl<R: RangeKv> StateStoreGet for RangeKvStateStoreReadSnapshot<R> {
693 async fn on_key_value<'a, O: Send + 'a>(
694 &'a self,
695 key: TableKey<Bytes>,
696 _read_options: ReadOptions,
697 on_key_value_fn: impl KeyValueFn<'a, O>,
698 ) -> StorageResult<Option<O>> {
699 self.inner
700 .get_keyed_row_impl(key, self.epoch, self.table_id)
701 .and_then(|option| {
702 if let Some((key, value)) = option {
703 on_key_value_fn(key.to_ref(), value.as_ref()).map(Some)
704 } else {
705 Ok(None)
706 }
707 })
708 }
709}
710
711impl<R: RangeKv> StateStoreRead for RangeKvStateStoreReadSnapshot<R> {
712 type Iter = RangeKvStateStoreIter<R>;
713 type RevIter = RangeKvStateStoreRevIter<R>;
714
715 async fn iter(
716 &self,
717 key_range: TableKeyRange,
718 _read_options: ReadOptions,
719 ) -> StorageResult<Self::Iter> {
720 self.inner.iter_impl(key_range, self.epoch, self.table_id)
721 }
722
723 async fn rev_iter(
724 &self,
725 key_range: TableKeyRange,
726 _read_options: ReadOptions,
727 ) -> StorageResult<Self::RevIter> {
728 self.inner
729 .rev_iter_impl(key_range, self.epoch, self.table_id)
730 }
731}
732
733impl<R: RangeKv> StateStoreReadVector for RangeKvStateStoreReadSnapshot<R> {
734 async fn nearest<'a, O: Send + 'a>(
735 &'a self,
736 vec: VectorRef<'a>,
737 options: VectorNearestOptions,
738 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
739 ) -> StorageResult<Vec<O>> {
740 fn nearest_impl<'a, M: MeasureDistanceBuilder, O>(
741 store: &'a InMemVectorStore,
742 epoch: u64,
743 table_id: TableId,
744 vec: VectorRef<'a>,
745 options: VectorNearestOptions,
746 on_nearest_item_fn: impl OnNearestItemFn<'a, O>,
747 ) -> Vec<O> {
748 let mut builder = NearestBuilder::<'_, O, M>::new(vec, options.top_n);
749 builder.add(
750 store
751 .read()
752 .get(&table_id)
753 .map(|vec| vec.iter())
754 .into_iter()
755 .flatten()
756 .filter(|(_, _, vector_epoch)| epoch >= *vector_epoch)
757 .map(|(vec, info, _)| (vec.to_ref(), info.as_ref())),
758 on_nearest_item_fn,
759 );
760 builder.finish()
761 }
762 dispatch_distance_measurement!(options.measure, MeasurementType, {
763 Ok(nearest_impl::<MeasurementType, O>(
764 &self.inner.vectors,
765 self.epoch,
766 self.table_id,
767 vec,
768 options,
769 on_nearest_item_fn,
770 ))
771 })
772 }
773}
774
775impl<R: RangeKv> RangeKvStateStore<R> {
776 fn get_keyed_row_impl(
777 &self,
778 key: TableKey<Bytes>,
779 epoch: u64,
780 table_id: TableId,
781 ) -> StorageResult<Option<StateStoreKeyedRow>> {
782 let range_bounds = (Bound::Included(key.clone()), Bound::Included(key));
783 let res = self.scan(range_bounds, epoch, table_id, Some(1))?;
785
786 Ok(match res.as_slice() {
787 [] => None,
788 [(key, value)] => Some((
789 FullKey::decode(key.as_ref()).to_vec().into_bytes(),
790 value.clone(),
791 )),
792 _ => unreachable!(),
793 })
794 }
795
796 fn iter_impl(
797 &self,
798 key_range: TableKeyRange,
799 epoch: u64,
800 table_id: TableId,
801 ) -> StorageResult<RangeKvStateStoreIter<R>> {
802 Ok(RangeKvStateStoreIter::new(
803 batched_iter::Iter::new(
804 self.inner.clone(),
805 to_full_key_range(table_id, key_range),
806 false,
807 ),
808 epoch,
809 true,
810 ))
811 }
812
813 fn rev_iter_impl(
814 &self,
815 key_range: TableKeyRange,
816 epoch: u64,
817 table_id: TableId,
818 ) -> StorageResult<RangeKvStateStoreRevIter<R>> {
819 Ok(RangeKvStateStoreRevIter::new(
820 batched_iter::Iter::new(
821 self.inner.clone(),
822 to_full_key_range(table_id, key_range),
823 true,
824 ),
825 epoch,
826 true,
827 ))
828 }
829}
830
831impl<R: RangeKv> StateStoreReadLog for RangeKvStateStore<R> {
832 type ChangeLogIter = RangeKvStateStoreChangeLogIter<R>;
833
834 async fn next_epoch(&self, epoch: u64, options: NextEpochOptions) -> StorageResult<u64> {
835 loop {
836 {
837 let tables = self.tables.lock();
838 let Some(tables) = tables.get(&options.table_id) else {
839 return Err(HummockError::next_epoch(format!(
840 "table {} not exist",
841 options.table_id
842 ))
843 .into());
844 };
845 if let Some(next_epoch) = tables.next_epochs.get(&epoch) {
846 break Ok(*next_epoch);
847 }
848 }
849 yield_now().await;
850 }
851 }
852
853 async fn iter_log(
854 &self,
855 (min_epoch, max_epoch): (u64, u64),
856 key_range: TableKeyRange,
857 options: ReadLogOptions,
858 ) -> StorageResult<Self::ChangeLogIter> {
859 let new_value_iter = RangeKvStateStoreIter::new(
860 batched_iter::Iter::new(
861 self.inner.clone(),
862 to_full_key_range(options.table_id, key_range.clone()),
863 false,
864 ),
865 max_epoch,
866 true,
867 );
868 let old_value_iter = RangeKvStateStoreIter::new(
869 batched_iter::Iter::new(
870 self.inner.clone(),
871 to_full_key_range(options.table_id, key_range),
872 false,
873 ),
874 min_epoch,
875 false,
876 );
877 RangeKvStateStoreChangeLogIter::new(new_value_iter, old_value_iter)
878 }
879}
880
881impl<R: RangeKv> RangeKvStateStore<R> {
882 fn new_read_snapshot_impl(
883 &self,
884 epoch: u64,
885 table_id: TableId,
886 ) -> RangeKvStateStoreReadSnapshot<R> {
887 RangeKvStateStoreReadSnapshot {
888 inner: self.clone(),
889 epoch,
890 table_id,
891 }
892 }
893
894 pub(crate) fn ingest_batch(
895 &self,
896 mut kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
897 delete_ranges: Vec<(Bound<Bytes>, Bound<Bytes>)>,
898 epoch: u64,
899 table_id: TableId,
900 ) -> StorageResult<usize> {
901 let mut delete_keys = BTreeSet::new();
902 for del_range in delete_ranges {
903 for (key, _) in self.inner.range(
904 (
905 del_range
906 .0
907 .map(|table_key| FullKey::new(table_id, TableKey(table_key), epoch)),
908 del_range
909 .1
910 .map(|table_key| FullKey::new(table_id, TableKey(table_key), epoch)),
911 ),
912 None,
913 )? {
914 delete_keys.insert(key.user_key.table_key);
915 }
916 }
917 for key in delete_keys {
918 kv_pairs.push((key, StorageValue::new_delete()));
919 }
920
921 let mut size = 0;
922 self.inner
923 .ingest_batch(kv_pairs.into_iter().map(|(key, value)| {
924 size += key.len() + value.size();
925 (FullKey::new(table_id, key, epoch), value.user_value)
926 }))?;
927 Ok(size)
928 }
929
930 fn ingest_vectors(&self, table_id: TableId, epoch: u64, vecs: Vec<(Vector, Bytes)>) {
931 self.vectors
932 .write()
933 .entry(table_id)
934 .or_default()
935 .extend(vecs.into_iter().map(|(vec, info)| (vec, info, epoch)));
936 }
937}
938
939impl<R: RangeKv> StateStore for RangeKvStateStore<R> {
940 type Local = RangeKvLocalStateStore<R>;
941 type ReadSnapshot = RangeKvStateStoreReadSnapshot<R>;
942 type VectorWriter = RangeKvLocalStateStore<R>;
943
944 async fn try_wait_epoch(
945 &self,
946 _epoch: HummockReadEpoch,
947 _options: TryWaitEpochOptions,
948 ) -> StorageResult<()> {
949 Ok(())
951 }
952
953 async fn new_local(&self, option: NewLocalOptions) -> Self::Local {
954 RangeKvLocalStateStore::new(self.clone(), option)
955 }
956
957 async fn new_read_snapshot(
958 &self,
959 epoch: HummockReadEpoch,
960 options: NewReadSnapshotOptions,
961 ) -> StorageResult<Self::ReadSnapshot> {
962 Ok(self.new_read_snapshot_impl(epoch.get_epoch(), options.table_id))
963 }
964
965 async fn new_vector_writer(&self, options: NewVectorWriterOptions) -> Self::VectorWriter {
966 RangeKvLocalStateStore::new(
967 self.clone(),
968 NewLocalOptions {
969 table_id: options.table_id,
970 fragment_id: FragmentId::default(),
971 op_consistency_level: Default::default(),
972 table_option: Default::default(),
973 is_replicated: false,
974 vnodes: Arc::new(Bitmap::from_bool_slice(&[true])),
975 upload_on_flush: true,
976 },
977 )
978 }
979}
980
981pub struct RangeKvLocalStateStore<R: RangeKv> {
982 mem_table: MemTable,
983 vectors: Vec<(Vector, Bytes)>,
984 inner: RangeKvStateStore<R>,
985
986 epoch: Option<EpochPair>,
987
988 table_id: TableId,
989 op_consistency_level: OpConsistencyLevel,
990 vnodes: Arc<Bitmap>,
991}
992
993impl<R: RangeKv> RangeKvLocalStateStore<R> {
994 pub fn new(inner: RangeKvStateStore<R>, option: NewLocalOptions) -> Self {
995 Self {
996 inner,
997 mem_table: MemTable::new(option.table_id, option.op_consistency_level.clone()),
998 epoch: None,
999 table_id: option.table_id,
1000 op_consistency_level: option.op_consistency_level,
1001 vnodes: option.vnodes,
1002 vectors: vec![],
1003 }
1004 }
1005
1006 fn epoch(&self) -> u64 {
1007 self.epoch.expect("should have set the epoch").curr
1008 }
1009}
1010
1011impl<R: RangeKv> StateStoreGet for RangeKvLocalStateStore<R> {
1012 async fn on_key_value<'a, O: Send + 'a>(
1013 &'a self,
1014 key: TableKey<Bytes>,
1015 _read_options: ReadOptions,
1016 on_key_value_fn: impl KeyValueFn<'a, O>,
1017 ) -> StorageResult<Option<O>> {
1018 if let Some((key, value)) = match self.mem_table.buffer.get(&key) {
1019 None => self
1020 .inner
1021 .get_keyed_row_impl(key, self.epoch(), self.table_id)?,
1022 Some(op) => match op {
1023 KeyOp::Insert(value) | KeyOp::Update((_, value)) => Some((
1024 FullKey::new(self.table_id, key, self.epoch()),
1025 value.clone(),
1026 )),
1027 KeyOp::Delete(_) => None,
1028 },
1029 } {
1030 Ok(Some(on_key_value_fn(key.to_ref(), value.as_ref())?))
1031 } else {
1032 Ok(None)
1033 }
1034 }
1035}
1036
1037impl<R: RangeKv> LocalStateStore for RangeKvLocalStateStore<R> {
1038 type FlushedSnapshotReader = RangeKvStateStoreReadSnapshot<R>;
1039
1040 type Iter<'a> = impl StateStoreIter + 'a;
1041 type RevIter<'a> = impl StateStoreIter + 'a;
1042
1043 async fn iter(
1044 &self,
1045 key_range: TableKeyRange,
1046 _read_options: ReadOptions,
1047 ) -> StorageResult<Self::Iter<'_>> {
1048 let iter = self
1049 .inner
1050 .iter_impl(key_range.clone(), self.epoch(), self.table_id)?;
1051 Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream(
1052 self.mem_table.iter(key_range),
1053 iter.into_stream(to_owned_item),
1054 self.table_id,
1055 self.epoch(),
1056 false,
1057 ))))
1058 }
1059
1060 async fn rev_iter(
1061 &self,
1062 key_range: TableKeyRange,
1063 _read_options: ReadOptions,
1064 ) -> StorageResult<Self::RevIter<'_>> {
1065 let iter = self
1066 .inner
1067 .rev_iter_impl(key_range.clone(), self.epoch(), self.table_id)?;
1068 Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream(
1069 self.mem_table.rev_iter(key_range),
1070 iter.into_stream(to_owned_item),
1071 self.table_id,
1072 self.epoch(),
1073 true,
1074 ))))
1075 }
1076
1077 fn insert(
1078 &mut self,
1079 key: TableKey<Bytes>,
1080 new_val: Bytes,
1081 old_val: Option<Bytes>,
1082 ) -> StorageResult<()> {
1083 match old_val {
1084 None => self.mem_table.insert(key, new_val)?,
1085 Some(old_val) => self.mem_table.update(key, old_val, new_val)?,
1086 };
1087 Ok(())
1088 }
1089
1090 fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()> {
1091 Ok(self.mem_table.delete(key, old_val)?)
1092 }
1093
1094 async fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> StorageResult<Arc<Bitmap>> {
1095 if self.vnodes.len() > 1 {
1096 TableState::wait_epoch(
1097 &self.inner.tables,
1098 self.table_id,
1099 self.epoch.expect("should have init").prev,
1100 )
1101 .await;
1102 }
1103 Ok(std::mem::replace(&mut self.vnodes, vnodes))
1104 }
1105
1106 fn get_table_watermark(&self, _vnode: VirtualNode) -> Option<Bytes> {
1107 None
1109 }
1110
1111 fn new_flushed_snapshot_reader(&self) -> Self::FlushedSnapshotReader {
1112 self.inner.new_read_snapshot_impl(MAX_EPOCH, self.table_id)
1113 }
1114}
1115
1116impl<R: RangeKv> StateStoreWriteEpochControl for RangeKvLocalStateStore<R> {
1117 async fn flush(&mut self) -> StorageResult<usize> {
1118 let buffer = self.mem_table.drain().into_parts();
1119 let mut kv_pairs = Vec::with_capacity(buffer.len());
1120 let sanity_check_read_snapshot = if sanity_check_enabled() {
1121 Some(self.inner.new_read_snapshot_impl(MAX_EPOCH, self.table_id))
1122 } else {
1123 None
1124 };
1125 for (key, key_op) in buffer {
1126 match key_op {
1127 KeyOp::Insert(value) => {
1131 if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1132 do_insert_sanity_check(
1133 self.table_id,
1134 &key,
1135 &value,
1136 sanity_check_read_snapshot,
1137 &self.op_consistency_level,
1138 )
1139 .await?;
1140 }
1141 kv_pairs.push((key, StorageValue::new_put(value)));
1142 }
1143 KeyOp::Delete(old_value) => {
1144 if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1145 do_delete_sanity_check(
1146 self.table_id,
1147 &key,
1148 &old_value,
1149 sanity_check_read_snapshot,
1150 &self.op_consistency_level,
1151 )
1152 .await?;
1153 }
1154 kv_pairs.push((key, StorageValue::new_delete()));
1155 }
1156 KeyOp::Update((old_value, new_value)) => {
1157 if let Some(sanity_check_read_snapshot) = &sanity_check_read_snapshot {
1158 do_update_sanity_check(
1159 self.table_id,
1160 &key,
1161 &old_value,
1162 &new_value,
1163 sanity_check_read_snapshot,
1164 &self.op_consistency_level,
1165 )
1166 .await?;
1167 }
1168 kv_pairs.push((key, StorageValue::new_put(new_value)));
1169 }
1170 }
1171 }
1172 let epoch = self.epoch();
1173 self.inner
1174 .ingest_vectors(self.table_id, epoch, take(&mut self.vectors));
1175 self.inner
1176 .ingest_batch(kv_pairs, vec![], epoch, self.table_id)
1177 }
1178
1179 async fn init(&mut self, options: InitOptions) -> StorageResult<()> {
1180 assert_eq!(
1181 self.epoch.replace(options.epoch),
1182 None,
1183 "epoch in local state store of table id {:?} is init for more than once",
1184 self.table_id
1185 );
1186 self.inner
1187 .tables
1188 .lock()
1189 .entry(self.table_id)
1190 .or_insert_with(|| TableState::new(options.epoch.prev))
1191 .next_epochs
1192 .insert(options.epoch.prev, options.epoch.curr);
1193 if self.vnodes.len() > 1 {
1194 TableState::wait_epoch(&self.inner.tables, self.table_id, options.epoch.prev).await;
1195 }
1196
1197 Ok(())
1198 }
1199
1200 fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions) {
1201 assert!(!self.mem_table.is_dirty());
1202 if let Some(value_checker) = opts.switch_op_consistency_level {
1203 self.mem_table.op_consistency_level.update(&value_checker);
1204 }
1205 let epoch = self
1206 .epoch
1207 .as_mut()
1208 .expect("should have init epoch before seal the first epoch");
1209 let prev_epoch = epoch.curr;
1210 epoch.prev = prev_epoch;
1211 epoch.curr = next_epoch;
1212 assert!(
1213 next_epoch > prev_epoch,
1214 "new epoch {} should be greater than current epoch: {}",
1215 next_epoch,
1216 prev_epoch
1217 );
1218
1219 let mut tables = self.inner.tables.lock();
1220 let table_state = tables
1221 .get_mut(&self.table_id)
1222 .expect("should be set when init");
1223
1224 table_state.next_epochs.insert(prev_epoch, next_epoch);
1225 if self.vnodes.len() > 1 {
1226 let sealing_epoch_vnodes = table_state
1227 .sealing_epochs
1228 .entry(prev_epoch)
1229 .or_insert_with(|| BitmapBuilder::zeroed(self.vnodes.len()));
1230 assert_eq!(self.vnodes.len(), sealing_epoch_vnodes.len());
1231 for vnode in self.vnodes.iter_ones() {
1232 assert!(!sealing_epoch_vnodes.is_set(vnode));
1233 sealing_epoch_vnodes.set(vnode, true);
1234 }
1235 if (0..self.vnodes.len()).all(|vnode| sealing_epoch_vnodes.is_set(vnode)) {
1236 let (all_sealed_epoch, _) =
1237 table_state.sealing_epochs.pop_first().expect("non-empty");
1238 assert_eq!(
1239 all_sealed_epoch, prev_epoch,
1240 "new all_sealed_epoch must be the current prev epoch"
1241 );
1242 if let Some(prev_latest_sealed_epoch) =
1243 table_state.latest_sealed_epoch.replace(prev_epoch)
1244 {
1245 assert!(prev_epoch > prev_latest_sealed_epoch);
1246 }
1247 }
1248 }
1249
1250 if let Some((direction, watermarks, _watermark_type)) = opts.table_watermarks {
1251 let delete_ranges = watermarks
1252 .iter()
1253 .flat_map(|vnode_watermark| {
1254 let inner_range = match direction {
1255 WatermarkDirection::Ascending => {
1256 (Unbounded, Excluded(vnode_watermark.watermark().clone()))
1257 }
1258 WatermarkDirection::Descending => {
1259 (Excluded(vnode_watermark.watermark().clone()), Unbounded)
1260 }
1261 };
1262 vnode_watermark
1263 .vnode_bitmap()
1264 .iter_vnodes()
1265 .map(move |vnode| {
1266 let (start, end) =
1267 prefixed_range_with_vnode(inner_range.clone(), vnode);
1268 (start.map(|key| key.0), end.map(|key| key.0))
1269 })
1270 })
1271 .collect_vec();
1272 if let Err(e) =
1273 self.inner
1274 .ingest_batch(Vec::new(), delete_ranges, self.epoch(), self.table_id)
1275 {
1276 error!(error = %e.as_report(), "failed to write delete ranges of table watermark");
1277 }
1278 }
1279 }
1280
1281 async fn try_flush(&mut self) -> StorageResult<()> {
1282 Ok(())
1283 }
1284}
1285
1286impl<R: RangeKv> StateStoreWriteVector for RangeKvLocalStateStore<R> {
1287 fn insert(&mut self, vec: VectorRef<'_>, info: Bytes) -> StorageResult<()> {
1288 self.vectors.push((vec.to_owned_scalar(), info));
1289 Ok(())
1290 }
1291}
1292
1293pub struct RangeKvStateStoreIter<R: RangeKv> {
1294 inner: batched_iter::Iter<R>,
1295
1296 epoch: HummockEpoch,
1297 is_inclusive_epoch: bool,
1298
1299 last_key: Option<UserKey<Bytes>>,
1300
1301 item_buffer: Option<StateStoreKeyedRow>,
1302}
1303
1304impl<R: RangeKv> RangeKvStateStoreIter<R> {
1305 pub fn new(
1306 inner: batched_iter::Iter<R>,
1307 epoch: HummockEpoch,
1308 is_inclusive_epoch: bool,
1309 ) -> Self {
1310 Self {
1311 inner,
1312 epoch,
1313 is_inclusive_epoch,
1314 last_key: None,
1315 item_buffer: None,
1316 }
1317 }
1318}
1319
1320impl<R: RangeKv> StateStoreIter for RangeKvStateStoreIter<R> {
1321 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
1322 self.next_inner()?;
1323 Ok(self
1324 .item_buffer
1325 .as_ref()
1326 .map(|(key, value)| (key.to_ref(), value.as_ref())))
1327 }
1328}
1329
1330impl<R: RangeKv> RangeKvStateStoreIter<R> {
1331 fn next_inner(&mut self) -> StorageResult<()> {
1332 self.item_buffer = None;
1333 while let Some((key, value)) = self.inner.next()? {
1334 let epoch = key.epoch_with_gap.pure_epoch();
1335 if epoch > self.epoch {
1336 continue;
1337 }
1338 if epoch == self.epoch && !self.is_inclusive_epoch {
1339 continue;
1340 }
1341 if Some(key.user_key.as_ref()) != self.last_key.as_ref().map(|key| key.as_ref()) {
1342 self.last_key = Some(key.user_key.clone());
1343 if let Some(value) = value {
1344 self.item_buffer = Some((key, value));
1345 break;
1346 }
1347 }
1348 }
1349 Ok(())
1350 }
1351}
1352
1353pub struct RangeKvStateStoreRevIter<R: RangeKv> {
1354 inner: batched_iter::Iter<R>,
1355
1356 epoch: HummockEpoch,
1357 is_inclusive_epoch: bool,
1358
1359 item_buffer: VecDeque<StateStoreKeyedRow>,
1360}
1361
1362impl<R: RangeKv> RangeKvStateStoreRevIter<R> {
1363 pub fn new(
1364 inner: batched_iter::Iter<R>,
1365 epoch: HummockEpoch,
1366 is_inclusive_epoch: bool,
1367 ) -> Self {
1368 Self {
1369 inner,
1370 epoch,
1371 is_inclusive_epoch,
1372 item_buffer: VecDeque::default(),
1373 }
1374 }
1375}
1376
1377impl<R: RangeKv> StateStoreIter for RangeKvStateStoreRevIter<R> {
1378 async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
1379 self.next_inner()?;
1380 Ok(self
1381 .item_buffer
1382 .back()
1383 .map(|(key, value)| (key.to_ref(), value.as_ref())))
1384 }
1385}
1386
1387impl<R: RangeKv> RangeKvStateStoreRevIter<R> {
1388 fn next_inner(&mut self) -> StorageResult<()> {
1389 self.item_buffer.pop_back();
1390 while let Some((key, value)) = self.inner.next()? {
1391 let epoch = key.epoch_with_gap.pure_epoch();
1392 if epoch > self.epoch {
1393 continue;
1394 }
1395 if epoch == self.epoch && !self.is_inclusive_epoch {
1396 continue;
1397 }
1398
1399 let v = match value {
1400 Some(v) => v,
1401 None => {
1402 if let Some(last_key) = self.item_buffer.front()
1403 && key.user_key.as_ref() == last_key.0.user_key.as_ref()
1404 {
1405 self.item_buffer.clear();
1406 }
1407 continue;
1408 }
1409 };
1410
1411 if let Some(last_key) = self.item_buffer.front() {
1412 if key.user_key.as_ref() != last_key.0.user_key.as_ref() {
1413 self.item_buffer.push_front((key, v));
1414 break;
1415 } else {
1416 self.item_buffer.pop_front();
1417 self.item_buffer.push_front((key, v));
1418 }
1419 } else {
1420 self.item_buffer.push_front((key, v));
1421 }
1422 }
1423 Ok(())
1424 }
1425}
1426
1427pub struct RangeKvStateStoreChangeLogIter<R: RangeKv> {
1428 new_value_iter: RangeKvStateStoreIter<R>,
1429 old_value_iter: RangeKvStateStoreIter<R>,
1430 item_buffer: Option<(TableKey<Bytes>, ChangeLogValue<Bytes>)>,
1431}
1432
1433impl<R: RangeKv> RangeKvStateStoreChangeLogIter<R> {
1434 fn new(
1435 mut new_value_iter: RangeKvStateStoreIter<R>,
1436 mut old_value_iter: RangeKvStateStoreIter<R>,
1437 ) -> StorageResult<Self> {
1438 new_value_iter.next_inner()?;
1439 old_value_iter.next_inner()?;
1440 Ok(Self {
1441 new_value_iter,
1442 old_value_iter,
1443 item_buffer: None,
1444 })
1445 }
1446}
1447
1448impl<R: RangeKv> StateStoreIter<StateStoreReadLogItem> for RangeKvStateStoreChangeLogIter<R> {
1449 async fn try_next(&mut self) -> StorageResult<Option<StateStoreReadLogItemRef<'_>>> {
1450 loop {
1451 match (
1452 &self.new_value_iter.item_buffer,
1453 &self.old_value_iter.item_buffer,
1454 ) {
1455 (None, None) => {
1456 self.item_buffer = None;
1457 break;
1458 }
1459 (Some((key, new_value)), None) => {
1460 self.item_buffer = Some((
1461 key.user_key.table_key.clone(),
1462 ChangeLogValue::Insert(new_value.clone()),
1463 ));
1464 self.new_value_iter.next_inner()?;
1465 }
1466 (None, Some((key, old_value))) => {
1467 self.item_buffer = Some((
1468 key.user_key.table_key.clone(),
1469 ChangeLogValue::Delete(old_value.clone()),
1470 ));
1471 self.old_value_iter.next_inner()?;
1472 }
1473 (Some((new_value_key, new_value)), Some((old_value_key, old_value))) => {
1474 match new_value_key.user_key.cmp(&old_value_key.user_key) {
1475 Ordering::Less => {
1476 self.item_buffer = Some((
1477 new_value_key.user_key.table_key.clone(),
1478 ChangeLogValue::Insert(new_value.clone()),
1479 ));
1480 self.new_value_iter.next_inner()?;
1481 }
1482 Ordering::Greater => {
1483 self.item_buffer = Some((
1484 old_value_key.user_key.table_key.clone(),
1485 ChangeLogValue::Delete(old_value.clone()),
1486 ));
1487 self.old_value_iter.next_inner()?;
1488 }
1489 Ordering::Equal => {
1490 if new_value == old_value {
1491 self.new_value_iter.next_inner()?;
1492 self.old_value_iter.next_inner()?;
1493 continue;
1494 }
1495 self.item_buffer = Some((
1496 new_value_key.user_key.table_key.clone(),
1497 ChangeLogValue::Update {
1498 new_value: new_value.clone(),
1499 old_value: old_value.clone(),
1500 },
1501 ));
1502 self.new_value_iter.next_inner()?;
1503 self.old_value_iter.next_inner()?;
1504 }
1505 }
1506 }
1507 };
1508 break;
1509 }
1510 Ok(self
1511 .item_buffer
1512 .as_ref()
1513 .map(|(key, value)| (key.to_ref(), value.to_ref())))
1514 }
1515}
1516
1517#[cfg(test)]
1518mod tests {
1519 use risingwave_common::util::epoch::test_epoch;
1520
1521 use super::*;
1522 use crate::hummock::iterator::test_utils::{
1523 iterator_test_table_key_of, iterator_test_value_of,
1524 };
1525 use crate::hummock::test_utils::{ReadOptions, *};
1526 use crate::memory::sled::SledStateStore;
1527
1528 #[tokio::test]
1529 async fn test_snapshot_isolation_memory() {
1530 let state_store = MemoryStateStore::new();
1531 test_snapshot_isolation_inner(state_store).await;
1532 }
1533
1534 #[cfg(not(madsim))]
1535 #[tokio::test]
1536 async fn test_snapshot_isolation_sled() {
1537 let state_store = SledStateStore::new_temp();
1538 test_snapshot_isolation_inner(state_store).await;
1539 }
1540
1541 async fn test_snapshot_isolation_inner(state_store: RangeKvStateStore<impl RangeKv>) {
1542 state_store
1543 .ingest_batch(
1544 vec![
1545 (
1546 TableKey(Bytes::from(b"a".to_vec())),
1547 StorageValue::new_put(b"v1".to_vec()),
1548 ),
1549 (
1550 TableKey(Bytes::from(b"b".to_vec())),
1551 StorageValue::new_put(b"v1".to_vec()),
1552 ),
1553 ],
1554 vec![],
1555 0,
1556 Default::default(),
1557 )
1558 .unwrap();
1559 state_store
1560 .ingest_batch(
1561 vec![
1562 (
1563 TableKey(Bytes::from(b"a".to_vec())),
1564 StorageValue::new_put(b"v2".to_vec()),
1565 ),
1566 (
1567 TableKey(Bytes::from(b"b".to_vec())),
1568 StorageValue::new_delete(),
1569 ),
1570 ],
1571 vec![],
1572 test_epoch(1),
1573 Default::default(),
1574 )
1575 .unwrap();
1576 assert_eq!(
1577 state_store
1578 .scan(
1579 (
1580 Bound::Included(TableKey(Bytes::from("a"))),
1581 Bound::Included(TableKey(Bytes::from("b"))),
1582 ),
1583 0,
1584 TableId::default(),
1585 None,
1586 )
1587 .unwrap(),
1588 vec![
1589 (
1590 FullKey::for_test(Default::default(), Bytes::from("a"), 0)
1591 .encode()
1592 .into(),
1593 b"v1".to_vec().into()
1594 ),
1595 (
1596 FullKey::for_test(Default::default(), Bytes::from("b"), 0)
1597 .encode()
1598 .into(),
1599 b"v1".to_vec().into()
1600 )
1601 ]
1602 );
1603 assert_eq!(
1604 state_store
1605 .scan(
1606 (
1607 Bound::Included(TableKey(Bytes::from("a"))),
1608 Bound::Included(TableKey(Bytes::from("b"))),
1609 ),
1610 0,
1611 TableId::default(),
1612 Some(1),
1613 )
1614 .unwrap(),
1615 vec![(
1616 FullKey::for_test(Default::default(), b"a".to_vec(), 0)
1617 .encode()
1618 .into(),
1619 b"v1".to_vec().into()
1620 )]
1621 );
1622 assert_eq!(
1623 state_store
1624 .scan(
1625 (
1626 Bound::Included(TableKey(Bytes::from("a"))),
1627 Bound::Included(TableKey(Bytes::from("b"))),
1628 ),
1629 test_epoch(1),
1630 TableId::default(),
1631 None,
1632 )
1633 .unwrap(),
1634 vec![(
1635 FullKey::for_test(Default::default(), b"a".to_vec(), test_epoch(1))
1636 .encode()
1637 .into(),
1638 b"v2".to_vec().into()
1639 )]
1640 );
1641 assert_eq!(
1642 state_store
1643 .get(TableKey(Bytes::from("a")), 0, ReadOptions::default())
1644 .await
1645 .unwrap(),
1646 Some(Bytes::from("v1"))
1647 );
1648 assert_eq!(
1649 state_store
1650 .get(
1651 TableKey(Bytes::copy_from_slice(b"b")),
1652 0,
1653 ReadOptions::default(),
1654 )
1655 .await
1656 .unwrap(),
1657 Some(b"v1".to_vec().into())
1658 );
1659 assert_eq!(
1660 state_store
1661 .get(
1662 TableKey(Bytes::copy_from_slice(b"c")),
1663 0,
1664 ReadOptions::default(),
1665 )
1666 .await
1667 .unwrap(),
1668 None
1669 );
1670 assert_eq!(
1671 state_store
1672 .get(
1673 TableKey(Bytes::copy_from_slice(b"a")),
1674 test_epoch(1),
1675 ReadOptions::default(),
1676 )
1677 .await
1678 .unwrap(),
1679 Some(b"v2".to_vec().into())
1680 );
1681 assert_eq!(
1682 state_store
1683 .get(
1684 TableKey(Bytes::from("b")),
1685 test_epoch(1),
1686 ReadOptions::default(),
1687 )
1688 .await
1689 .unwrap(),
1690 None
1691 );
1692 assert_eq!(
1693 state_store
1694 .get(
1695 TableKey(Bytes::from("c")),
1696 test_epoch(1),
1697 ReadOptions::default()
1698 )
1699 .await
1700 .unwrap(),
1701 None
1702 );
1703 }
1704
1705 #[tokio::test]
1706 async fn test_iter_log_memory() {
1707 let state_store = MemoryStateStore::new();
1708 test_iter_log_inner(state_store).await;
1709 }
1710
1711 #[cfg(not(madsim))]
1712 #[tokio::test]
1713 async fn test_iter_log_sled() {
1714 let state_store = SledStateStore::new_temp();
1715 test_iter_log_inner(state_store).await;
1716 }
1717
1718 async fn test_iter_log_inner(state_store: RangeKvStateStore<impl RangeKv>) {
1719 let table_id = TableId::new(233);
1720 let epoch1 = test_epoch(1);
1721 let key_idx = [1, 2, 4];
1722 let make_key = |i| TableKey(Bytes::from(iterator_test_table_key_of(i)));
1723 let make_value = |i| Bytes::from(iterator_test_value_of(i));
1724 state_store
1725 .ingest_batch(
1726 key_idx
1727 .iter()
1728 .map(|i| (make_key(*i), StorageValue::new_put(make_value(*i))))
1729 .collect(),
1730 vec![],
1731 epoch1,
1732 table_id,
1733 )
1734 .unwrap();
1735 {
1736 let mut iter = state_store
1737 .iter_log(
1738 (epoch1, epoch1),
1739 (Unbounded, Unbounded),
1740 ReadLogOptions { table_id },
1741 )
1742 .await
1743 .unwrap();
1744 for i in key_idx {
1745 let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1746 assert_eq!(make_key(i).to_ref(), iter_key);
1747 assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1748 }
1749 assert!(iter.try_next().await.unwrap().is_none());
1750 }
1751
1752 let epoch2 = test_epoch(2);
1753 state_store
1754 .ingest_batch(
1755 vec![
1756 (make_key(1), StorageValue::new_put(make_value(12))), (make_key(2), StorageValue::new_delete()), (make_key(3), StorageValue::new_put(make_value(3))),
1759 ],
1760 vec![],
1761 epoch2,
1762 table_id,
1763 )
1764 .unwrap();
1765
1766 {
1768 let expected = vec![
1769 (
1770 make_key(1),
1771 ChangeLogValue::Update {
1772 new_value: make_value(12),
1773 old_value: make_value(1),
1774 },
1775 ),
1776 (make_key(2), ChangeLogValue::Delete(make_value(2))),
1777 (make_key(3), ChangeLogValue::Insert(make_value(3))),
1778 ];
1779 let mut iter = state_store
1780 .iter_log(
1781 (epoch2, epoch2),
1782 (Unbounded, Unbounded),
1783 ReadLogOptions { table_id },
1784 )
1785 .await
1786 .unwrap();
1787 for (key, change_log_value) in expected {
1788 let (iter_key, iter_value) = iter.try_next().await.unwrap().unwrap();
1789 assert_eq!(
1790 key.to_ref(),
1791 iter_key,
1792 "{:?} {:?}",
1793 change_log_value.to_ref(),
1794 iter_value
1795 );
1796 assert_eq!(change_log_value.to_ref(), iter_value);
1797 }
1798 assert!(iter.try_next().await.unwrap().is_none());
1799 }
1800 {
1802 let mut iter = state_store
1803 .iter_log(
1804 (epoch1, epoch1),
1805 (Unbounded, Unbounded),
1806 ReadLogOptions { table_id },
1807 )
1808 .await
1809 .unwrap();
1810 for i in key_idx {
1811 let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1812 assert_eq!(make_key(i).to_ref(), iter_key);
1813 assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1814 }
1815 assert!(iter.try_next().await.unwrap().is_none());
1816 }
1817 {
1819 let mut iter = state_store
1820 .iter_log(
1821 (epoch1, epoch2),
1822 (Unbounded, Unbounded),
1823 ReadLogOptions { table_id },
1824 )
1825 .await
1826 .unwrap();
1827 let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1828 assert_eq!(make_key(1).to_ref(), iter_key);
1829 assert_eq!(
1830 change_value,
1831 ChangeLogValue::Insert(make_value(12).as_ref())
1832 );
1833 for i in [3, 4] {
1834 let (iter_key, change_value) = iter.try_next().await.unwrap().unwrap();
1835 assert_eq!(make_key(i).to_ref(), iter_key);
1836 assert_eq!(change_value, ChangeLogValue::Insert(make_value(i).as_ref()));
1837 }
1838 assert!(iter.try_next().await.unwrap().is_none());
1839 }
1840 }
1841}