1use std::cmp::Ordering::{Equal, Less};
16use std::sync::Arc;
17
18use foyer::CacheHint;
19use risingwave_hummock_sdk::key::FullKey;
20use risingwave_hummock_sdk::sstable_info::SstableInfo;
21
22use crate::hummock::iterator::{Backward, HummockIterator, ValueMeta};
23use crate::hummock::sstable::SstableIteratorReadOptions;
24use crate::hummock::value::HummockValue;
25use crate::hummock::{
26 BlockIterator, HummockResult, SstableIteratorType, SstableStoreRef, TableHolder,
27};
28use crate::monitor::StoreLocalStatistic;
29
30pub struct BackwardSstableIterator {
32 block_iter: Option<BlockIterator>,
34
35 cur_idx: usize,
37
38 sst: TableHolder,
40
41 sstable_store: SstableStoreRef,
42
43 stats: StoreLocalStatistic,
44
45 read_block_meta_range: (usize, usize),
47}
48
49impl BackwardSstableIterator {
50 pub fn new(
51 sstable: TableHolder,
52 sstable_store: SstableStoreRef,
53 sstable_info_ref: &SstableInfo,
54 ) -> Self {
55 let mut start_idx = 0;
56 let mut end_idx = sstable.meta.block_metas.len() - 1;
57 let read_table_id_range = (
58 *sstable_info_ref.table_ids.first().unwrap(),
59 *sstable_info_ref.table_ids.last().unwrap(),
60 );
61 assert!(
62 read_table_id_range.0 <= read_table_id_range.1,
63 "invalid table id range {} - {}",
64 read_table_id_range.0,
65 read_table_id_range.1
66 );
67 let block_meta_count = sstable.meta.block_metas.len();
68 assert!(block_meta_count > 0);
69 assert!(
70 sstable.meta.block_metas[0].table_id().table_id() <= read_table_id_range.0,
71 "table id {} not found table_ids in block_meta {:?}",
72 read_table_id_range.0,
73 sstable
74 .meta
75 .block_metas
76 .iter()
77 .map(|meta| meta.table_id())
78 .collect::<Vec<_>>()
79 );
80 assert!(
81 sstable.meta.block_metas[block_meta_count - 1]
82 .table_id()
83 .table_id()
84 >= read_table_id_range.1,
85 "table id {} not found table_ids in block_meta {:?}",
86 read_table_id_range.1,
87 sstable
88 .meta
89 .block_metas
90 .iter()
91 .map(|meta| meta.table_id())
92 .collect::<Vec<_>>()
93 );
94
95 while start_idx < block_meta_count
96 && sstable.meta.block_metas[start_idx].table_id().table_id() < read_table_id_range.0
97 {
98 start_idx += 1;
99 }
100 assert!(
102 start_idx < block_meta_count,
103 "table id {} not found table_ids in block_meta {:?}",
104 read_table_id_range.0,
105 sstable
106 .meta
107 .block_metas
108 .iter()
109 .map(|meta| meta.table_id())
110 .collect::<Vec<_>>()
111 );
112
113 while end_idx > start_idx
114 && sstable.meta.block_metas[end_idx].table_id().table_id() > read_table_id_range.1
115 {
116 end_idx -= 1;
117 }
118 assert!(
119 end_idx >= start_idx,
120 "end_idx {} < start_idx {} block_meta_count {}",
121 end_idx,
122 start_idx,
123 block_meta_count
124 );
125
126 Self {
127 block_iter: None,
128 cur_idx: end_idx,
129 sst: sstable,
130 sstable_store,
131 stats: StoreLocalStatistic::default(),
132 read_block_meta_range: (start_idx, end_idx),
133 }
134 }
135
136 async fn seek_idx(
138 &mut self,
139 idx: isize,
140 seek_key: Option<FullKey<&[u8]>>,
141 ) -> HummockResult<()> {
142 if idx >= self.sst.block_count() as isize || idx < self.read_block_meta_range.0 as isize {
143 self.block_iter = None;
144 } else {
145 let block = self
146 .sstable_store
147 .get(
148 &self.sst,
149 idx as usize,
150 crate::hummock::CachePolicy::Fill(CacheHint::Normal),
151 &mut self.stats,
152 )
153 .await?;
154 let mut block_iter = BlockIterator::new(block);
155 if let Some(key) = seek_key {
156 block_iter.seek_le(key);
157 } else {
158 block_iter.seek_to_last();
159 }
160
161 self.block_iter = Some(block_iter);
162 self.cur_idx = idx as usize;
163 }
164
165 Ok(())
166 }
167}
168
169impl HummockIterator for BackwardSstableIterator {
170 type Direction = Backward;
171
172 async fn next(&mut self) -> HummockResult<()> {
173 self.stats.total_key_count += 1;
174 let block_iter = self.block_iter.as_mut().expect("no block iter");
175 if block_iter.try_prev() {
176 Ok(())
177 } else {
178 self.seek_idx(self.cur_idx as isize - 1, None).await
180 }
181 }
182
183 fn key(&self) -> FullKey<&[u8]> {
184 self.block_iter.as_ref().expect("no block iter").key()
185 }
186
187 fn value(&self) -> HummockValue<&[u8]> {
188 let raw_value = self.block_iter.as_ref().expect("no block iter").value();
189
190 HummockValue::from_slice(raw_value).expect("decode error")
191 }
192
193 fn is_valid(&self) -> bool {
194 self.block_iter.as_ref().is_some_and(|i| i.is_valid())
195 }
196
197 async fn rewind(&mut self) -> HummockResult<()> {
200 self.seek_idx(self.read_block_meta_range.1 as isize, None)
201 .await
202 }
203
204 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
205 let block_idx = self
206 .sst
207 .meta
208 .block_metas
209 .partition_point(|block_meta| {
210 let ord = FullKey::decode(&block_meta.smallest_key).cmp(&key);
214 ord == Less || ord == Equal
215 })
216 .saturating_sub(1); let block_idx = block_idx as isize;
218
219 self.seek_idx(block_idx, Some(key)).await?;
220 if !self.is_valid() {
221 self.seek_idx(block_idx - 1, None).await?;
223 }
224
225 Ok(())
226 }
227
228 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
229 stats.add(&self.stats)
230 }
231
232 fn value_meta(&self) -> ValueMeta {
233 ValueMeta {
234 object_id: Some(self.sst.id),
235 block_id: Some(self.cur_idx as _),
236 }
237 }
238}
239
240impl SstableIteratorType for BackwardSstableIterator {
241 fn create(
242 sstable: TableHolder,
243 sstable_store: SstableStoreRef,
244 _: Arc<SstableIteratorReadOptions>,
245 sstable_info_ref: &SstableInfo,
246 ) -> Self {
247 BackwardSstableIterator::new(sstable, sstable_store, sstable_info_ref)
248 }
249}
250
251#[cfg(test)]
253mod tests {
254 use itertools::Itertools;
255 use rand::prelude::*;
256 use rand::rng as thread_rng;
257 use risingwave_common::catalog::TableId;
258 use risingwave_common::hash::VirtualNode;
259 use risingwave_common::util::epoch::test_epoch;
260 use risingwave_hummock_sdk::EpochWithGap;
261 use risingwave_hummock_sdk::key::UserKey;
262 use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
263
264 use super::*;
265 use crate::assert_bytes_eq;
266 use crate::hummock::iterator::test_utils::mock_sstable_store;
267 use crate::hummock::test_utils::{
268 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_default_test_sstable,
269 gen_test_sstable_with_table_ids, test_key_of, test_value_of,
270 };
271
272 #[tokio::test]
273 async fn test_backward_sstable_iterator() {
274 let sstable_store = mock_sstable_store().await;
276 let (handle, sstable_info) =
277 gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
278 .await;
279 assert!(handle.meta.block_metas.len() > 10);
282 let mut sstable_iter = BackwardSstableIterator::new(handle, sstable_store, &sstable_info);
283 let mut cnt = TEST_KEYS_COUNT;
284 sstable_iter.rewind().await.unwrap();
285
286 while sstable_iter.is_valid() {
287 cnt -= 1;
288 let key = sstable_iter.key();
289 let value = sstable_iter.value();
290 assert_eq!(key, test_key_of(cnt).to_ref());
291 assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
292 sstable_iter.next().await.unwrap();
293 }
294
295 assert_eq!(cnt, 0);
296 }
297
298 #[tokio::test]
299 async fn test_backward_sstable_seek() {
300 let sstable_store = mock_sstable_store().await;
301 let (sstable, sstable_info) =
302 gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
303 .await;
304 assert!(sstable.meta.block_metas.len() > 10);
307 let mut sstable_iter = BackwardSstableIterator::new(sstable, sstable_store, &sstable_info);
308 let mut all_key_to_test = (0..TEST_KEYS_COUNT).collect_vec();
309 let mut rng = thread_rng();
310 all_key_to_test.shuffle(&mut rng);
311
312 for i in all_key_to_test {
314 sstable_iter.seek(test_key_of(i).to_ref()).await.unwrap();
315 let key = sstable_iter.key();
317 assert_eq!(key, test_key_of(i).to_ref());
318 }
319
320 sstable_iter
322 .seek(test_key_of(TEST_KEYS_COUNT - 500).to_ref())
323 .await
324 .unwrap();
325 for i in (0..TEST_KEYS_COUNT - 500 + 1).rev() {
326 let key = sstable_iter.key();
327 assert_eq!(key, test_key_of(i).to_ref(), "key index:{}", i);
328 sstable_iter.next().await.unwrap();
329 }
330 assert!(!sstable_iter.is_valid());
331
332 let largest_key = FullKey::for_test(
333 TableId::default(),
334 [
335 VirtualNode::ZERO.to_be_bytes().as_slice(),
336 format!("key_zzzz_{:05}", 0).as_bytes(),
337 ]
338 .concat(),
339 test_epoch(1),
340 );
341 sstable_iter.seek(largest_key.to_ref()).await.unwrap();
342 let key = sstable_iter.key();
343 assert_eq!(key, test_key_of(TEST_KEYS_COUNT - 1).to_ref());
344
345 let smallest_key = FullKey::for_test(
347 TableId::default(),
348 [
349 VirtualNode::ZERO.to_be_bytes().as_slice(),
350 format!("key_aaaa_{:05}", 0).as_bytes(),
351 ]
352 .concat(),
353 test_epoch(1),
354 );
355 sstable_iter.seek(smallest_key.to_ref()).await.unwrap();
356 assert!(!sstable_iter.is_valid());
357
358 for idx in (1..TEST_KEYS_COUNT).rev() {
360 sstable_iter
365 .seek(
366 FullKey::for_test(
367 TableId::default(),
368 [
369 VirtualNode::ZERO.to_be_bytes().as_slice(),
370 format!("key_test_{:05}", idx * 2 - 1).as_bytes(),
371 ]
372 .concat(),
373 0,
374 )
375 .to_ref(),
376 )
377 .await
378 .unwrap();
379
380 let key = sstable_iter.key();
381 assert_eq!(key, test_key_of(idx - 1).to_ref());
382 sstable_iter.next().await.unwrap();
383 }
384 assert!(!sstable_iter.is_valid());
385 }
386
387 #[tokio::test]
388 async fn test_read_table_id_range() {
389 {
390 let sstable_store = mock_sstable_store().await;
391 let k1 = {
393 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
394 table_key.extend_from_slice(format!("key_test_{:05}", 1).as_bytes());
395 let uk = UserKey::for_test(TableId::from(1), table_key);
396 FullKey {
397 user_key: uk,
398 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
399 }
400 };
401
402 let k2 = {
403 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
404 table_key.extend_from_slice(format!("key_test_{:05}", 2).as_bytes());
405 let uk = UserKey::for_test(TableId::from(2), table_key);
406 FullKey {
407 user_key: uk,
408 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
409 }
410 };
411
412 let k3 = {
413 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
414 table_key.extend_from_slice(format!("key_test_{:05}", 3).as_bytes());
415 let uk = UserKey::for_test(TableId::from(3), table_key);
416 FullKey {
417 user_key: uk,
418 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
419 }
420 };
421
422 {
423 let kv_pairs = vec![
424 (k1.clone(), HummockValue::put(test_value_of(1))),
425 (k2.clone(), HummockValue::put(test_value_of(2))),
426 (k3.clone(), HummockValue::put(test_value_of(3))),
427 ];
428
429 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
430 default_builder_opt_for_test(),
431 10,
432 kv_pairs.into_iter(),
433 sstable_store.clone(),
434 vec![1, 2, 3],
435 )
436 .await;
437 let mut sstable_iter = BackwardSstableIterator::create(
438 sstable,
439 sstable_store.clone(),
440 Arc::new(SstableIteratorReadOptions::default()),
441 &SstableInfo::from(SstableInfoInner {
442 table_ids: vec![1, 2, 3],
443 ..Default::default()
444 }),
445 );
446 sstable_iter.rewind().await.unwrap();
447 assert!(sstable_iter.is_valid());
448 assert!(sstable_iter.key().eq(&k3.to_ref()));
449
450 let mut cnt = 0;
451 let mut last_key = k1.clone();
452 while sstable_iter.is_valid() {
453 last_key = sstable_iter.key().to_vec();
454 cnt += 1;
455 sstable_iter.next().await.unwrap();
456 }
457
458 assert_eq!(3, cnt);
459 assert_eq!(last_key, k1.clone());
460 }
461
462 {
463 let kv_pairs = vec![
464 (k1.clone(), HummockValue::put(test_value_of(1))),
465 (k2.clone(), HummockValue::put(test_value_of(2))),
466 (k3.clone(), HummockValue::put(test_value_of(3))),
467 ];
468
469 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
470 default_builder_opt_for_test(),
471 10,
472 kv_pairs.into_iter(),
473 sstable_store.clone(),
474 vec![1, 2, 3],
475 )
476 .await;
477
478 let mut sstable_iter = BackwardSstableIterator::create(
479 sstable,
480 sstable_store.clone(),
481 Arc::new(SstableIteratorReadOptions::default()),
482 &SstableInfo::from(SstableInfoInner {
483 table_ids: vec![1, 2],
484 ..Default::default()
485 }),
486 );
487 sstable_iter.rewind().await.unwrap();
488 assert!(sstable_iter.is_valid());
489 assert!(sstable_iter.key().eq(&k2.to_ref()));
490
491 let mut cnt = 0;
492 let mut last_key = k1.clone();
493 while sstable_iter.is_valid() {
494 last_key = sstable_iter.key().to_vec();
495 cnt += 1;
496 sstable_iter.next().await.unwrap();
497 }
498
499 assert_eq!(2, cnt);
500 assert_eq!(last_key, k1.clone());
501 }
502
503 {
504 let kv_pairs = vec![
505 (k1.clone(), HummockValue::put(test_value_of(1))),
506 (k2.clone(), HummockValue::put(test_value_of(2))),
507 (k3.clone(), HummockValue::put(test_value_of(3))),
508 ];
509
510 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
511 default_builder_opt_for_test(),
512 10,
513 kv_pairs.into_iter(),
514 sstable_store.clone(),
515 vec![1, 2, 3],
516 )
517 .await;
518
519 let mut sstable_iter = BackwardSstableIterator::create(
520 sstable,
521 sstable_store.clone(),
522 Arc::new(SstableIteratorReadOptions::default()),
523 &SstableInfo::from(SstableInfoInner {
524 table_ids: vec![2, 3],
525 ..Default::default()
526 }),
527 );
528 sstable_iter.rewind().await.unwrap();
529 assert!(sstable_iter.is_valid());
530 assert!(sstable_iter.key().eq(&k3.to_ref()));
531
532 let mut cnt = 0;
533 let mut last_key = k1.clone();
534 while sstable_iter.is_valid() {
535 last_key = sstable_iter.key().to_vec();
536 cnt += 1;
537 sstable_iter.next().await.unwrap();
538 }
539
540 assert_eq!(2, cnt);
541 assert_eq!(last_key, k2.clone());
542 }
543
544 {
545 let kv_pairs = vec![
546 (k1.clone(), HummockValue::put(test_value_of(1))),
547 (k2.clone(), HummockValue::put(test_value_of(2))),
548 (k3.clone(), HummockValue::put(test_value_of(3))),
549 ];
550
551 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
552 default_builder_opt_for_test(),
553 10,
554 kv_pairs.into_iter(),
555 sstable_store.clone(),
556 vec![1, 2, 3],
557 )
558 .await;
559
560 let mut sstable_iter = BackwardSstableIterator::create(
561 sstable,
562 sstable_store.clone(),
563 Arc::new(SstableIteratorReadOptions::default()),
564 &SstableInfo::from(SstableInfoInner {
565 table_ids: vec![2],
566 ..Default::default()
567 }),
568 );
569 sstable_iter.rewind().await.unwrap();
570 assert!(sstable_iter.is_valid());
571 assert!(sstable_iter.key().eq(&k2.to_ref()));
572
573 let mut cnt = 0;
574 let mut last_key = k1.clone();
575 while sstable_iter.is_valid() {
576 last_key = sstable_iter.key().to_vec();
577 cnt += 1;
578 sstable_iter.next().await.unwrap();
579 }
580
581 assert_eq!(1, cnt);
582 assert_eq!(last_key, k2.clone());
583 }
584 }
585 }
586}