1use std::cmp::Ordering;
16use std::ops::Bound::{Excluded, Included, Unbounded};
17
18use risingwave_common::catalog::TableId;
19use risingwave_common::must_match;
20use risingwave_common::util::epoch::MAX_SPILL_TIMES;
21use risingwave_hummock_sdk::EpochWithGap;
22use risingwave_hummock_sdk::key::{
23 FullKey, SetSlice, TableKeyRange, UserKey, UserKeyRange, bound_table_key_range,
24};
25
26use crate::StateStoreIter;
27use crate::error::StorageResult;
28use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator};
29use crate::hummock::value::HummockValue;
30use crate::hummock::{HummockResult, SstableIterator};
31use crate::monitor::IterLocalMetricsGuard;
32use crate::store::{ChangeLogValue, StateStoreReadLogItem, StateStoreReadLogItemRef};
33
34struct ChangeLogIteratorInner<
35 NI: HummockIterator<Direction = Forward>,
36 OI: HummockIterator<Direction = Forward>,
37> {
38 new_value_iter: NI,
43 old_value_iter: OI,
49 max_epoch: u64,
51 min_epoch: u64,
53 key_range: UserKeyRange,
54
55 curr_key: FullKey<Vec<u8>>,
57 new_value: Vec<u8>,
59 is_new_value_delete: bool,
61
62 is_old_value_set: bool,
64
65 is_current_pos_valid: bool,
67}
68
69impl<NI: HummockIterator<Direction = Forward>, OI: HummockIterator<Direction = Forward>>
70 ChangeLogIteratorInner<NI, OI>
71{
72 fn new(
73 (min_epoch, max_epoch): (u64, u64),
74 key_range: UserKeyRange,
75 new_value_iter: NI,
76 old_value_iter: OI,
77 ) -> Self {
78 Self {
79 new_value_iter,
80 old_value_iter,
81 min_epoch,
82 max_epoch,
83 key_range,
84
85 curr_key: FullKey::default(),
86 new_value: vec![],
87 is_new_value_delete: false,
88 is_old_value_set: false,
89 is_current_pos_valid: false,
90 }
91 }
92
93 pub async fn rewind(&mut self) -> HummockResult<()> {
95 match &self.key_range.0 {
97 Included(begin_key) => {
98 let full_key = FullKey {
99 user_key: begin_key.as_ref(),
100 epoch_with_gap: EpochWithGap::new(self.max_epoch, MAX_SPILL_TIMES),
101 };
102 self.new_value_iter.seek(full_key).await?;
103 self.old_value_iter.seek(full_key).await?;
104 }
105 Excluded(_) => unimplemented!("excluded begin key is not supported"),
106 Unbounded => {
107 self.new_value_iter.rewind().await?;
108 self.old_value_iter.rewind().await?;
109 }
110 };
111
112 self.try_advance_to_next_change_log_value().await?;
113 Ok(())
114 }
115
116 pub async fn next(&mut self) -> HummockResult<()> {
117 self.try_advance_to_next_change_log_value().await
118 }
119
120 pub fn is_valid(&self) -> bool {
121 self.is_current_pos_valid
122 }
123
124 pub fn log_value(&self) -> ChangeLogValue<&[u8]> {
125 if self.is_new_value_delete {
126 ChangeLogValue::Delete(
127 self.old_value()
128 .expect("should have old value when new value is delete"),
129 )
130 } else {
131 match self.old_value() {
132 Some(old_value) => ChangeLogValue::Update {
133 new_value: self.new_value.as_slice(),
134 old_value,
135 },
136 None => ChangeLogValue::Insert(self.new_value.as_slice()),
137 }
138 }
139 }
140
141 pub fn key(&self) -> UserKey<&[u8]> {
142 self.curr_key.user_key.as_ref()
143 }
144}
145
146impl<NI: HummockIterator<Direction = Forward>, OI: HummockIterator<Direction = Forward>>
147 ChangeLogIteratorInner<NI, OI>
148{
149 async fn try_advance_to_next_change_log_value(&mut self) -> HummockResult<()> {
150 loop {
151 self.try_advance_to_next_valid().await?;
152 if !self.is_valid() {
153 break;
154 }
155 if self.has_log_value() {
156 break;
157 } else {
158 continue;
159 }
160 }
161 Ok(())
162 }
163
164 fn user_key_out_of_range(&self, user_key: UserKey<&[u8]>) -> bool {
165 match &self.key_range.1 {
167 Included(end_key) => user_key > end_key.as_ref(),
168 Excluded(end_key) => user_key >= end_key.as_ref(),
169 Unbounded => false,
170 }
171 }
172
173 async fn advance_to_valid_key(&mut self) -> HummockResult<()> {
175 self.is_current_pos_valid = false;
176 loop {
177 if !self.new_value_iter.is_valid() {
178 return Ok(());
179 }
180
181 let key = self.new_value_iter.key();
182
183 if !self.is_valid_epoch(key.epoch_with_gap) {
185 self.new_value_iter.next().await?;
186 continue;
187 }
188
189 if self.user_key_out_of_range(key.user_key) {
190 return Ok(());
191 }
192
193 break;
194 }
195
196 debug_assert!(self.new_value_iter.is_valid());
197 debug_assert!(self.is_valid_epoch(self.new_value_iter.key().epoch_with_gap));
198 debug_assert!(!self.user_key_out_of_range(self.new_value_iter.key().user_key));
199 self.is_current_pos_valid = true;
200 self.curr_key.set(self.new_value_iter.key());
203 match self.new_value_iter.value() {
204 HummockValue::Put(val) => {
205 self.new_value.set(val);
206 self.is_new_value_delete = false;
207 }
208 HummockValue::Delete => {
209 self.new_value.clear();
210 self.is_new_value_delete = true;
211 }
212 }
213
214 Ok(())
215 }
216
217 async fn advance_to_find_oldest_epoch(&mut self) -> HummockResult<EpochWithGap> {
219 let mut ret = self.curr_key.epoch_with_gap;
220 debug_assert!(self.is_valid_epoch(ret));
221 self.new_value_iter.next().await?;
222 loop {
223 if !self.new_value_iter.is_valid() {
224 break;
225 }
226 let key = self.new_value_iter.key();
227 match self.curr_key.user_key.as_ref().cmp(&key.user_key) {
228 Ordering::Less => {
229 break;
231 }
232 Ordering::Equal => {
233 assert!(ret > key.epoch_with_gap);
234 if !self.is_valid_epoch(key.epoch_with_gap) {
235 debug_assert!(self.min_epoch > key.epoch_with_gap.pure_epoch());
236 break;
237 }
238 ret = key.epoch_with_gap;
239 self.new_value_iter.next().await?;
240 continue;
241 }
242 Ordering::Greater => {
243 unreachable!(
244 "hummock iterator advance to a prev key: {:?} {:?}",
245 self.curr_key,
246 self.new_value_iter.key()
247 );
248 }
249 }
250 }
251 debug_assert!(self.is_valid_epoch(ret));
252
253 Ok(ret)
254 }
255
256 async fn try_advance_to_next_valid(&mut self) -> HummockResult<()> {
261 self.advance_to_valid_key().await?;
263
264 if !self.is_current_pos_valid {
265 return Ok(());
266 }
267
268 let oldest_epoch = self.advance_to_find_oldest_epoch().await?;
270
271 self.is_old_value_set = false;
273 loop {
274 if !self.old_value_iter.is_valid() {
275 break;
276 }
277
278 let old_value_iter_key = self.old_value_iter.key();
279 match self
280 .curr_key
281 .user_key
282 .as_ref()
283 .cmp(&old_value_iter_key.user_key.as_ref())
284 {
285 Ordering::Less => {
286 break;
288 }
289 Ordering::Equal => match old_value_iter_key.epoch_with_gap.cmp(&oldest_epoch) {
290 Ordering::Less => {
291 assert!(
295 old_value_iter_key.epoch_with_gap.pure_epoch() < self.min_epoch,
296 "there should not be old value between oldest new_value and min_epoch. \
297 new value key: {:?}, oldest epoch: {:?}, min epoch: {:?}, old value epoch: {:?}",
298 self.curr_key,
299 oldest_epoch,
300 self.min_epoch,
301 old_value_iter_key.epoch_with_gap
302 );
303 break;
304 }
305 Ordering::Equal => {
306 self.is_old_value_set = true;
307 break;
308 }
309 Ordering::Greater => {
310 self.old_value_iter.next().await?;
311 continue;
312 }
313 },
314 Ordering::Greater => {
315 self.old_value_iter.next().await?;
316 continue;
317 }
318 }
319 }
320
321 Ok(())
322 }
323
324 fn is_valid_epoch(&self, epoch: EpochWithGap) -> bool {
325 let epoch = epoch.pure_epoch();
326 self.min_epoch <= epoch && epoch <= self.max_epoch
327 }
328
329 fn old_value(&self) -> Option<&[u8]> {
330 if self.is_old_value_set {
331 debug_assert!(self.old_value_iter.is_valid());
332 debug_assert_eq!(
333 self.old_value_iter.key().user_key,
334 self.curr_key.user_key.as_ref()
335 );
336 Some(must_match!(self.old_value_iter.value(), HummockValue::Put(val) => val))
337 } else {
338 None
339 }
340 }
341
342 fn has_log_value(&self) -> bool {
343 debug_assert!(self.is_current_pos_valid);
344 !self.is_new_value_delete || self.is_old_value_set
345 }
346}
347
348impl Drop for ChangeLogIterator {
349 fn drop(&mut self) {
350 self.inner
351 .new_value_iter
352 .collect_local_statistic(&mut self.stats_guard.local_stats);
353 self.inner
354 .old_value_iter
355 .collect_local_statistic(&mut self.stats_guard.local_stats);
356 }
357}
358
359pub struct ChangeLogIterator {
360 inner: ChangeLogIteratorInner<MergeIterator<SstableIterator>, MergeIterator<SstableIterator>>,
361 initial_read: bool,
362 stats_guard: IterLocalMetricsGuard,
363}
364
365impl ChangeLogIterator {
366 pub async fn new(
367 epoch_range: (u64, u64),
368 table_key_range: TableKeyRange,
369 new_value_iter: MergeIterator<SstableIterator>,
370 old_value_iter: MergeIterator<SstableIterator>,
371 table_id: TableId,
372 stats_guard: IterLocalMetricsGuard,
373 ) -> HummockResult<Self> {
374 let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
375 let (start_bound, end_bound) = (
376 user_key_range_ref.0.map(|key| key.cloned()),
377 user_key_range_ref.1.map(|key| key.cloned()),
378 );
379 let mut inner = ChangeLogIteratorInner::new(
380 epoch_range,
381 (start_bound, end_bound),
382 new_value_iter,
383 old_value_iter,
384 );
385 inner.rewind().await?;
386 Ok(Self {
387 inner,
388 initial_read: false,
389 stats_guard,
390 })
391 }
392}
393
394impl StateStoreIter<StateStoreReadLogItem> for ChangeLogIterator {
395 async fn try_next(&mut self) -> StorageResult<Option<StateStoreReadLogItemRef<'_>>> {
396 if !self.initial_read {
397 self.initial_read = true;
398 } else {
399 self.inner.next().await?;
400 }
401 if self.inner.is_valid() {
402 Ok(Some((self.inner.key().table_key, self.inner.log_value())))
403 } else {
404 Ok(None)
405 }
406 }
407}
408
409#[cfg(any(test, feature = "test"))]
410pub mod test_utils {
411 use std::collections::HashMap;
412
413 use bytes::Bytes;
414 use rand::{Rng, RngCore, rng as thread_rng};
415 use risingwave_common::util::epoch::{EpochPair, MAX_EPOCH, test_epoch};
416 use risingwave_hummock_sdk::key::TableKey;
417
418 use crate::hummock::iterator::test_utils::iterator_test_table_key_of;
419 use crate::mem_table::KeyOp;
420 use crate::store::{InitOptions, LocalStateStore, SealCurrentEpochOptions};
421
422 pub type TestLogDataType = Vec<(u64, Vec<(TableKey<Bytes>, KeyOp)>)>;
423
424 pub fn gen_test_data(
425 epoch_count: usize,
426 key_count: usize,
427 skip_ratio: f64,
428 delete_ratio: f64,
429 ) -> TestLogDataType {
430 let mut store: HashMap<TableKey<Bytes>, Bytes> = HashMap::new();
431 let mut rng = thread_rng();
432 let mut logs = Vec::new();
433 for epoch_idx in 1..=(epoch_count - 1) {
434 let mut epoch_logs = Vec::new();
435 let epoch = test_epoch(epoch_idx as _);
436 for key_idx in 0..key_count {
437 if rng.random_bool(skip_ratio) {
438 continue;
439 }
440 let key = TableKey(Bytes::from(iterator_test_table_key_of(key_idx)));
441 if rng.random_bool(delete_ratio) {
442 if let Some(prev_value) = store.remove(&key) {
443 epoch_logs.push((key, KeyOp::Delete(prev_value)));
444 }
445 } else {
446 let value = Bytes::copy_from_slice(rng.next_u64().to_string().as_bytes());
447 let prev_value = store.get(&key);
448 if let Some(prev_value) = prev_value {
449 epoch_logs.push((
450 key.clone(),
451 KeyOp::Update((prev_value.clone(), value.clone())),
452 ));
453 } else {
454 epoch_logs.push((key.clone(), KeyOp::Insert(value.clone())));
455 }
456 store.insert(key, value);
457 }
458 }
459 logs.push((epoch, epoch_logs));
460 }
461 {
463 let mut epoch_logs = Vec::new();
464 let epoch = test_epoch(epoch_count as _);
465 for (key, value) in store {
466 epoch_logs.push((key, KeyOp::Delete(value)));
467 }
468 logs.push((epoch, epoch_logs));
469 }
470 logs
471 }
472
473 pub async fn apply_test_log_data(
474 log_data: TestLogDataType,
475 state_store: &mut impl LocalStateStore,
476 try_flush_ratio: f64,
477 ) {
478 let mut rng = thread_rng();
479 let first_epoch = log_data[0].0;
480 for (epoch, epoch_logs) in log_data {
481 if epoch == first_epoch {
482 state_store
483 .init(InitOptions {
484 epoch: EpochPair::new_test_epoch(epoch),
485 })
486 .await
487 .unwrap();
488 } else {
489 state_store.flush().await.unwrap();
490 state_store.seal_current_epoch(
491 epoch,
492 SealCurrentEpochOptions {
493 table_watermarks: None,
494 switch_op_consistency_level: None,
495 },
496 );
497 }
498 for (key, op) in epoch_logs {
499 match op {
500 KeyOp::Insert(value) => {
501 state_store.insert(key, value, None).unwrap();
502 }
503 KeyOp::Delete(old_value) => {
504 state_store.delete(key, old_value).unwrap();
505 }
506 KeyOp::Update((old_value, value)) => {
507 state_store.insert(key, value, Some(old_value)).unwrap();
508 }
509 }
510 if rng.random_bool(try_flush_ratio) {
511 state_store.try_flush().await.unwrap();
512 }
513 }
514 }
515 state_store.flush().await.unwrap();
516 state_store.seal_current_epoch(
517 MAX_EPOCH,
518 SealCurrentEpochOptions {
519 table_watermarks: None,
520 switch_op_consistency_level: None,
521 },
522 );
523 }
524}
525
526#[cfg(test)]
527mod tests {
528 use std::collections::BTreeMap;
529 use std::ops::Bound::Unbounded;
530
531 use bytes::Bytes;
532 use itertools::Itertools;
533 use risingwave_common::bitmap::Bitmap;
534 use risingwave_common::catalog::TableId;
535 use risingwave_common::hash::VirtualNode;
536 use risingwave_common::id::FragmentId;
537 use risingwave_common::util::epoch::test_epoch;
538 use risingwave_hummock_sdk::EpochWithGap;
539 use risingwave_hummock_sdk::key::{TableKey, UserKey};
540
541 use crate::hummock::iterator::MergeIterator;
542 use crate::hummock::iterator::change_log::ChangeLogIteratorInner;
543 use crate::hummock::iterator::change_log::test_utils::{
544 TestLogDataType, apply_test_log_data, gen_test_data,
545 };
546 use crate::hummock::iterator::test_utils::{
547 iterator_test_table_key_of, iterator_test_value_of,
548 };
549 use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator, MemTableStore};
550 use crate::memory::MemoryStateStore;
551 use crate::store::{
552 CHECK_BYTES_EQUAL, ChangeLogValue, NewLocalOptions, OpConsistencyLevel, ReadLogOptions,
553 StateStoreReadLog,
554 };
555 use crate::{StateStore, StateStoreIter};
556
557 #[tokio::test]
558 async fn test_empty() {
559 let table_id = TableId::new(233);
560 let epoch = EpochWithGap::new_from_epoch(test_epoch(1));
561 let empty = BTreeMap::new();
562 let new_value_iter = MemTableHummockIterator::new(&empty, epoch, table_id);
563 let old_value_iter = MemTableHummockIterator::new(&empty, epoch, table_id);
564 let mut iter = ChangeLogIteratorInner::new(
565 (epoch.pure_epoch(), epoch.pure_epoch()),
566 (Unbounded, Unbounded),
567 new_value_iter,
568 old_value_iter,
569 );
570 iter.rewind().await.unwrap();
571 assert!(!iter.is_valid());
572 }
573
574 #[tokio::test]
575 async fn test_append_only() {
576 let table_id = TableId::new(233);
577
578 let count = 100;
579 let kvs = (0..count)
580 .map(|i| {
581 (
582 TableKey(Bytes::from(iterator_test_table_key_of(i))),
583 Bytes::from(iterator_test_value_of(i)),
584 )
585 })
586 .collect_vec();
587 let mem_tables = kvs
588 .iter()
589 .map(|(key, value)| {
590 let mut t = MemTable::new(233.into(), OpConsistencyLevel::Inconsistent);
591 t.insert(key.clone(), value.clone()).unwrap();
592 t
593 })
594 .collect_vec();
595 let epoch = EpochWithGap::new_from_epoch(test_epoch(1));
596 let new_value_iter = MergeIterator::new(
597 mem_tables
598 .iter()
599 .map(|mem_table| MemTableHummockIterator::new(&mem_table.buffer, epoch, table_id)),
600 );
601 let empty = BTreeMap::new();
602 let old_value_iter = MemTableHummockIterator::new(&empty, epoch, table_id);
603 let mut iter = ChangeLogIteratorInner::new(
604 (epoch.pure_epoch(), epoch.pure_epoch()),
605 (Unbounded, Unbounded),
606 new_value_iter,
607 old_value_iter,
608 );
609 iter.rewind().await.unwrap();
610 for (key, value) in kvs {
611 assert!(iter.is_valid());
612 assert_eq!(
613 UserKey {
614 table_id,
615 table_key: key.to_ref(),
616 },
617 iter.key()
618 );
619 assert_eq!(ChangeLogValue::Insert(value.as_ref()), iter.log_value());
620 iter.next().await.unwrap();
621 }
622 assert!(!iter.is_valid());
623 }
624
625 #[tokio::test]
626 async fn test_delete_only() {
627 let table_id = TableId::new(233);
628
629 let count = 100;
630 let kvs = (0..count)
631 .map(|i| {
632 (
633 TableKey(Bytes::from(iterator_test_table_key_of(i))),
634 Bytes::from(iterator_test_value_of(i)),
635 )
636 })
637 .collect_vec();
638 let mut new_value_memtable = MemTable::new(233.into(), OpConsistencyLevel::Inconsistent);
639 let mut old_value_memtable = MemTable::new(233.into(), OpConsistencyLevel::Inconsistent);
640 for (key, value) in &kvs {
641 new_value_memtable
642 .delete(key.clone(), Bytes::new())
643 .unwrap();
644 old_value_memtable
645 .insert(key.clone(), value.clone())
646 .unwrap();
647 }
648 let epoch = EpochWithGap::new_from_epoch(test_epoch(1));
649 let new_value_iter =
650 MemTableHummockIterator::new(&new_value_memtable.buffer, epoch, table_id);
651 let old_value_iter =
652 MemTableHummockIterator::new(&old_value_memtable.buffer, epoch, table_id);
653 let mut iter = ChangeLogIteratorInner::new(
654 (epoch.pure_epoch(), epoch.pure_epoch()),
655 (Unbounded, Unbounded),
656 new_value_iter,
657 old_value_iter,
658 );
659 iter.rewind().await.unwrap();
660 for (key, value) in kvs {
661 assert!(iter.is_valid());
662 assert_eq!(
663 UserKey {
664 table_id,
665 table_key: key.to_ref(),
666 },
667 iter.key()
668 );
669 assert_eq!(ChangeLogValue::Delete(value.as_ref()), iter.log_value());
670 iter.next().await.unwrap();
671 }
672 assert!(!iter.is_valid());
673 }
674
675 fn gen_test_mem_table_store(
676 test_log_data: TestLogDataType,
677 ) -> Vec<(u64, MemTableStore, MemTableStore)> {
678 let mut logs = Vec::new();
679 for (epoch, epoch_logs) in test_log_data {
680 let mut new_values = MemTableStore::new();
681 let mut old_values = MemTableStore::new();
682 for (key, op) in epoch_logs {
683 new_values.insert(key.clone(), op.clone());
684 if let KeyOp::Delete(old_value) | KeyOp::Update((old_value, _)) = op {
685 old_values.insert(key, KeyOp::Insert(old_value));
686 }
687 }
688 logs.push((epoch, new_values, old_values));
689 }
690 logs
691 }
692
693 #[tokio::test]
694 async fn test_random_data() {
695 let table_id = TableId::new(233);
696 let epoch_count = 10;
697 let state_store = MemoryStateStore::new();
698 let mut local = state_store
699 .new_local(NewLocalOptions {
700 table_id,
701 fragment_id: FragmentId::default(),
702 op_consistency_level: OpConsistencyLevel::ConsistentOldValue {
703 check_old_value: CHECK_BYTES_EQUAL.clone(),
704 is_log_store: true,
705 },
706 table_option: Default::default(),
707 is_replicated: false,
708 vnodes: Bitmap::ones(VirtualNode::COUNT_FOR_TEST).into(),
709 upload_on_flush: true,
710 })
711 .await;
712 let logs = gen_test_data(epoch_count, 10000, 0.05, 0.2);
713 assert_eq!(logs.len(), epoch_count);
714 apply_test_log_data(logs.clone(), &mut local, 0.0).await;
715 let mem_table_logs = gen_test_mem_table_store(logs.clone());
716 assert_eq!(mem_table_logs.len(), epoch_count);
717 for start_epoch_idx in 0..epoch_count {
718 for end_epoch_idx in start_epoch_idx..epoch_count {
719 let new_value_iter = MergeIterator::new(mem_table_logs.iter().map(
720 |(epoch, new_value_memtable, _)| {
721 MemTableHummockIterator::new(
722 new_value_memtable,
723 EpochWithGap::new_from_epoch(*epoch),
724 table_id,
725 )
726 },
727 ));
728 let old_value_iter = MergeIterator::new(mem_table_logs.iter().map(
729 |(epoch, _, old_value_memtable)| {
730 MemTableHummockIterator::new(
731 old_value_memtable,
732 EpochWithGap::new_from_epoch(*epoch),
733 table_id,
734 )
735 },
736 ));
737 let epoch_range = (logs[start_epoch_idx].0, logs[end_epoch_idx].0);
738 let mut change_log_iter = ChangeLogIteratorInner::new(
739 epoch_range,
740 (Unbounded, Unbounded),
741 new_value_iter,
742 old_value_iter,
743 );
744 change_log_iter.rewind().await.unwrap();
745 let mut expected_change_log_iter = state_store
746 .iter_log(
747 epoch_range,
748 (Unbounded, Unbounded),
749 ReadLogOptions { table_id },
750 )
751 .await
752 .unwrap();
753 while let Some((key, change_log_value)) =
754 expected_change_log_iter.try_next().await.unwrap()
755 {
756 assert!(change_log_iter.is_valid());
757 assert_eq!(
758 change_log_iter.key(),
759 UserKey {
760 table_id,
761 table_key: key,
762 },
763 );
764 assert_eq!(change_log_iter.log_value(), change_log_value);
765 change_log_iter.next().await.unwrap();
766 }
767 assert!(!change_log_iter.is_valid());
768 }
769 }
770 }
771}