1use std::cmp::Ordering::{Equal, Less};
16use std::sync::Arc;
17
18use foyer::Hint;
19use risingwave_hummock_sdk::key::FullKey;
20use risingwave_hummock_sdk::sstable_info::SstableInfo;
21
22use crate::hummock::iterator::{Backward, HummockIterator, ValueMeta};
23use crate::hummock::sstable::SstableIteratorReadOptions;
24use crate::hummock::value::HummockValue;
25use crate::hummock::{
26 BlockIterator, HummockResult, SstableIteratorType, SstableStoreRef, TableHolder,
27};
28use crate::monitor::StoreLocalStatistic;
29
30pub struct BackwardSstableIterator {
32 block_iter: Option<BlockIterator>,
34
35 cur_idx: usize,
37
38 sst: TableHolder,
40
41 sstable_store: SstableStoreRef,
42
43 stats: StoreLocalStatistic,
44
45 read_block_meta_range: (usize, usize),
47}
48
49impl BackwardSstableIterator {
50 pub fn new(
51 sstable: TableHolder,
52 sstable_store: SstableStoreRef,
53 sstable_info_ref: &SstableInfo,
54 ) -> Self {
55 let mut start_idx = 0;
56 let mut end_idx = sstable.meta.block_metas.len() - 1;
57 let read_table_id_range = (
58 *sstable_info_ref.table_ids.first().unwrap(),
59 *sstable_info_ref.table_ids.last().unwrap(),
60 );
61 assert!(
62 read_table_id_range.0 <= read_table_id_range.1,
63 "invalid table id range {} - {}",
64 read_table_id_range.0,
65 read_table_id_range.1
66 );
67 let block_meta_count = sstable.meta.block_metas.len();
68 assert!(block_meta_count > 0);
69 assert!(
70 sstable.meta.block_metas[0].table_id() <= read_table_id_range.0,
71 "table id {} not found table_ids in block_meta {:?}",
72 read_table_id_range.0,
73 sstable
74 .meta
75 .block_metas
76 .iter()
77 .map(|meta| meta.table_id())
78 .collect::<Vec<_>>()
79 );
80 assert!(
81 sstable.meta.block_metas[block_meta_count - 1].table_id() >= read_table_id_range.1,
82 "table id {} not found table_ids in block_meta {:?}",
83 read_table_id_range.1,
84 sstable
85 .meta
86 .block_metas
87 .iter()
88 .map(|meta| meta.table_id())
89 .collect::<Vec<_>>()
90 );
91
92 while start_idx < block_meta_count
93 && sstable.meta.block_metas[start_idx].table_id() < read_table_id_range.0
94 {
95 start_idx += 1;
96 }
97 assert!(
99 start_idx < block_meta_count,
100 "table id {} not found table_ids in block_meta {:?}",
101 read_table_id_range.0,
102 sstable
103 .meta
104 .block_metas
105 .iter()
106 .map(|meta| meta.table_id())
107 .collect::<Vec<_>>()
108 );
109
110 while end_idx > start_idx
111 && sstable.meta.block_metas[end_idx].table_id() > read_table_id_range.1
112 {
113 end_idx -= 1;
114 }
115 assert!(
116 end_idx >= start_idx,
117 "end_idx {} < start_idx {} block_meta_count {}",
118 end_idx,
119 start_idx,
120 block_meta_count
121 );
122
123 Self {
124 block_iter: None,
125 cur_idx: end_idx,
126 sst: sstable,
127 sstable_store,
128 stats: StoreLocalStatistic::default(),
129 read_block_meta_range: (start_idx, end_idx),
130 }
131 }
132
133 async fn seek_idx(
135 &mut self,
136 idx: isize,
137 seek_key: Option<FullKey<&[u8]>>,
138 ) -> HummockResult<()> {
139 if idx >= self.sst.block_count() as isize || idx < self.read_block_meta_range.0 as isize {
140 self.block_iter = None;
141 } else {
142 let block = self
143 .sstable_store
144 .get(
145 &self.sst,
146 idx as usize,
147 crate::hummock::CachePolicy::Fill(Hint::Normal),
148 &mut self.stats,
149 )
150 .await?;
151 let mut block_iter = BlockIterator::new(block);
152 if let Some(key) = seek_key {
153 block_iter.seek_le(key);
154 } else {
155 block_iter.seek_to_last();
156 }
157
158 self.block_iter = Some(block_iter);
159 self.cur_idx = idx as usize;
160 }
161
162 Ok(())
163 }
164}
165
166impl HummockIterator for BackwardSstableIterator {
167 type Direction = Backward;
168
169 async fn next(&mut self) -> HummockResult<()> {
170 self.stats.total_key_count += 1;
171 let block_iter = self.block_iter.as_mut().expect("no block iter");
172 if block_iter.try_prev() {
173 Ok(())
174 } else {
175 self.seek_idx(self.cur_idx as isize - 1, None).await
177 }
178 }
179
180 fn key(&self) -> FullKey<&[u8]> {
181 self.block_iter.as_ref().expect("no block iter").key()
182 }
183
184 fn value(&self) -> HummockValue<&[u8]> {
185 let raw_value = self.block_iter.as_ref().expect("no block iter").value();
186
187 HummockValue::from_slice(raw_value).expect("decode error")
188 }
189
190 fn is_valid(&self) -> bool {
191 self.block_iter.as_ref().is_some_and(|i| i.is_valid())
192 }
193
194 async fn rewind(&mut self) -> HummockResult<()> {
197 self.seek_idx(self.read_block_meta_range.1 as isize, None)
198 .await
199 }
200
201 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
202 let block_idx = self
203 .sst
204 .meta
205 .block_metas
206 .partition_point(|block_meta| {
207 let ord = FullKey::decode(&block_meta.smallest_key).cmp(&key);
211 ord == Less || ord == Equal
212 })
213 .saturating_sub(1); let block_idx = block_idx as isize;
215
216 self.seek_idx(block_idx, Some(key)).await?;
217 if !self.is_valid() {
218 self.seek_idx(block_idx - 1, None).await?;
220 }
221
222 Ok(())
223 }
224
225 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
226 stats.add(&self.stats)
227 }
228
229 fn value_meta(&self) -> ValueMeta {
230 ValueMeta {
231 object_id: Some(self.sst.id),
232 block_id: Some(self.cur_idx as _),
233 }
234 }
235}
236
237impl SstableIteratorType for BackwardSstableIterator {
238 fn create(
239 sstable: TableHolder,
240 sstable_store: SstableStoreRef,
241 _: Arc<SstableIteratorReadOptions>,
242 sstable_info_ref: &SstableInfo,
243 ) -> Self {
244 BackwardSstableIterator::new(sstable, sstable_store, sstable_info_ref)
245 }
246}
247
248#[cfg(test)]
250mod tests {
251 use itertools::Itertools;
252 use rand::prelude::*;
253 use rand::rng as thread_rng;
254 use risingwave_common::catalog::TableId;
255 use risingwave_common::hash::VirtualNode;
256 use risingwave_common::util::epoch::test_epoch;
257 use risingwave_hummock_sdk::EpochWithGap;
258 use risingwave_hummock_sdk::key::UserKey;
259 use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
260
261 use super::*;
262 use crate::assert_bytes_eq;
263 use crate::hummock::iterator::test_utils::mock_sstable_store;
264 use crate::hummock::test_utils::{
265 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_default_test_sstable,
266 gen_test_sstable_with_table_ids, test_key_of, test_value_of,
267 };
268
269 #[tokio::test]
270 async fn test_backward_sstable_iterator() {
271 let sstable_store = mock_sstable_store().await;
273 let (handle, sstable_info) =
274 gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
275 .await;
276 assert!(handle.meta.block_metas.len() > 10);
279 let mut sstable_iter = BackwardSstableIterator::new(handle, sstable_store, &sstable_info);
280 let mut cnt = TEST_KEYS_COUNT;
281 sstable_iter.rewind().await.unwrap();
282
283 while sstable_iter.is_valid() {
284 cnt -= 1;
285 let key = sstable_iter.key();
286 let value = sstable_iter.value();
287 assert_eq!(key, test_key_of(cnt).to_ref());
288 assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
289 sstable_iter.next().await.unwrap();
290 }
291
292 assert_eq!(cnt, 0);
293 }
294
295 #[tokio::test]
296 async fn test_backward_sstable_seek() {
297 let sstable_store = mock_sstable_store().await;
298 let (sstable, sstable_info) =
299 gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
300 .await;
301 assert!(sstable.meta.block_metas.len() > 10);
304 let mut sstable_iter = BackwardSstableIterator::new(sstable, sstable_store, &sstable_info);
305 let mut all_key_to_test = (0..TEST_KEYS_COUNT).collect_vec();
306 let mut rng = thread_rng();
307 all_key_to_test.shuffle(&mut rng);
308
309 for i in all_key_to_test {
311 sstable_iter.seek(test_key_of(i).to_ref()).await.unwrap();
312 let key = sstable_iter.key();
314 assert_eq!(key, test_key_of(i).to_ref());
315 }
316
317 sstable_iter
319 .seek(test_key_of(TEST_KEYS_COUNT - 500).to_ref())
320 .await
321 .unwrap();
322 for i in (0..TEST_KEYS_COUNT - 500 + 1).rev() {
323 let key = sstable_iter.key();
324 assert_eq!(key, test_key_of(i).to_ref(), "key index:{}", i);
325 sstable_iter.next().await.unwrap();
326 }
327 assert!(!sstable_iter.is_valid());
328
329 let largest_key = FullKey::for_test(
330 TableId::default(),
331 [
332 VirtualNode::ZERO.to_be_bytes().as_slice(),
333 format!("key_zzzz_{:05}", 0).as_bytes(),
334 ]
335 .concat(),
336 test_epoch(1),
337 );
338 sstable_iter.seek(largest_key.to_ref()).await.unwrap();
339 let key = sstable_iter.key();
340 assert_eq!(key, test_key_of(TEST_KEYS_COUNT - 1).to_ref());
341
342 let smallest_key = FullKey::for_test(
344 TableId::default(),
345 [
346 VirtualNode::ZERO.to_be_bytes().as_slice(),
347 format!("key_aaaa_{:05}", 0).as_bytes(),
348 ]
349 .concat(),
350 test_epoch(1),
351 );
352 sstable_iter.seek(smallest_key.to_ref()).await.unwrap();
353 assert!(!sstable_iter.is_valid());
354
355 for idx in (1..TEST_KEYS_COUNT).rev() {
357 sstable_iter
362 .seek(
363 FullKey::for_test(
364 TableId::default(),
365 [
366 VirtualNode::ZERO.to_be_bytes().as_slice(),
367 format!("key_test_{:05}", idx * 2 - 1).as_bytes(),
368 ]
369 .concat(),
370 0,
371 )
372 .to_ref(),
373 )
374 .await
375 .unwrap();
376
377 let key = sstable_iter.key();
378 assert_eq!(key, test_key_of(idx - 1).to_ref());
379 sstable_iter.next().await.unwrap();
380 }
381 assert!(!sstable_iter.is_valid());
382 }
383
384 #[tokio::test]
385 async fn test_read_table_id_range() {
386 {
387 let sstable_store = mock_sstable_store().await;
388 let k1 = {
390 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
391 table_key.extend_from_slice(format!("key_test_{:05}", 1).as_bytes());
392 let uk = UserKey::for_test(TableId::from(1), table_key);
393 FullKey {
394 user_key: uk,
395 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
396 }
397 };
398
399 let k2 = {
400 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
401 table_key.extend_from_slice(format!("key_test_{:05}", 2).as_bytes());
402 let uk = UserKey::for_test(TableId::from(2), table_key);
403 FullKey {
404 user_key: uk,
405 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
406 }
407 };
408
409 let k3 = {
410 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
411 table_key.extend_from_slice(format!("key_test_{:05}", 3).as_bytes());
412 let uk = UserKey::for_test(TableId::from(3), table_key);
413 FullKey {
414 user_key: uk,
415 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
416 }
417 };
418
419 {
420 let kv_pairs = vec![
421 (k1.clone(), HummockValue::put(test_value_of(1))),
422 (k2.clone(), HummockValue::put(test_value_of(2))),
423 (k3.clone(), HummockValue::put(test_value_of(3))),
424 ];
425
426 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
427 default_builder_opt_for_test(),
428 10,
429 kv_pairs.into_iter(),
430 sstable_store.clone(),
431 vec![1, 2, 3],
432 )
433 .await;
434 let mut sstable_iter = BackwardSstableIterator::create(
435 sstable,
436 sstable_store.clone(),
437 Arc::new(SstableIteratorReadOptions::default()),
438 &SstableInfo::from(SstableInfoInner {
439 table_ids: vec![1.into(), 2.into(), 3.into()],
440 ..Default::default()
441 }),
442 );
443 sstable_iter.rewind().await.unwrap();
444 assert!(sstable_iter.is_valid());
445 assert!(sstable_iter.key().eq(&k3.to_ref()));
446
447 let mut cnt = 0;
448 let mut last_key = k1.clone();
449 while sstable_iter.is_valid() {
450 last_key = sstable_iter.key().to_vec();
451 cnt += 1;
452 sstable_iter.next().await.unwrap();
453 }
454
455 assert_eq!(3, cnt);
456 assert_eq!(last_key, k1.clone());
457 }
458
459 {
460 let kv_pairs = vec![
461 (k1.clone(), HummockValue::put(test_value_of(1))),
462 (k2.clone(), HummockValue::put(test_value_of(2))),
463 (k3.clone(), HummockValue::put(test_value_of(3))),
464 ];
465
466 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
467 default_builder_opt_for_test(),
468 10,
469 kv_pairs.into_iter(),
470 sstable_store.clone(),
471 vec![1, 2, 3],
472 )
473 .await;
474
475 let mut sstable_iter = BackwardSstableIterator::create(
476 sstable,
477 sstable_store.clone(),
478 Arc::new(SstableIteratorReadOptions::default()),
479 &SstableInfo::from(SstableInfoInner {
480 table_ids: vec![1.into(), 2.into()],
481 ..Default::default()
482 }),
483 );
484 sstable_iter.rewind().await.unwrap();
485 assert!(sstable_iter.is_valid());
486 assert!(sstable_iter.key().eq(&k2.to_ref()));
487
488 let mut cnt = 0;
489 let mut last_key = k1.clone();
490 while sstable_iter.is_valid() {
491 last_key = sstable_iter.key().to_vec();
492 cnt += 1;
493 sstable_iter.next().await.unwrap();
494 }
495
496 assert_eq!(2, cnt);
497 assert_eq!(last_key, k1.clone());
498 }
499
500 {
501 let kv_pairs = vec![
502 (k1.clone(), HummockValue::put(test_value_of(1))),
503 (k2.clone(), HummockValue::put(test_value_of(2))),
504 (k3.clone(), HummockValue::put(test_value_of(3))),
505 ];
506
507 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
508 default_builder_opt_for_test(),
509 10,
510 kv_pairs.into_iter(),
511 sstable_store.clone(),
512 vec![1, 2, 3],
513 )
514 .await;
515
516 let mut sstable_iter = BackwardSstableIterator::create(
517 sstable,
518 sstable_store.clone(),
519 Arc::new(SstableIteratorReadOptions::default()),
520 &SstableInfo::from(SstableInfoInner {
521 table_ids: vec![2.into(), 3.into()],
522 ..Default::default()
523 }),
524 );
525 sstable_iter.rewind().await.unwrap();
526 assert!(sstable_iter.is_valid());
527 assert!(sstable_iter.key().eq(&k3.to_ref()));
528
529 let mut cnt = 0;
530 let mut last_key = k1.clone();
531 while sstable_iter.is_valid() {
532 last_key = sstable_iter.key().to_vec();
533 cnt += 1;
534 sstable_iter.next().await.unwrap();
535 }
536
537 assert_eq!(2, cnt);
538 assert_eq!(last_key, k2.clone());
539 }
540
541 {
542 let kv_pairs = vec![
543 (k1.clone(), HummockValue::put(test_value_of(1))),
544 (k2.clone(), HummockValue::put(test_value_of(2))),
545 (k3.clone(), HummockValue::put(test_value_of(3))),
546 ];
547
548 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
549 default_builder_opt_for_test(),
550 10,
551 kv_pairs.into_iter(),
552 sstable_store.clone(),
553 vec![1, 2, 3],
554 )
555 .await;
556
557 let mut sstable_iter = BackwardSstableIterator::create(
558 sstable,
559 sstable_store.clone(),
560 Arc::new(SstableIteratorReadOptions::default()),
561 &SstableInfo::from(SstableInfoInner {
562 table_ids: vec![2.into()],
563 ..Default::default()
564 }),
565 );
566 sstable_iter.rewind().await.unwrap();
567 assert!(sstable_iter.is_valid());
568 assert!(sstable_iter.key().eq(&k2.to_ref()));
569
570 let mut cnt = 0;
571 let mut last_key = k1.clone();
572 while sstable_iter.is_valid() {
573 last_key = sstable_iter.key().to_vec();
574 cnt += 1;
575 sstable_iter.next().await.unwrap();
576 }
577
578 assert_eq!(1, cnt);
579 assert_eq!(last_key, k2.clone());
580 }
581 }
582 }
583}