1use std::cmp::Ordering::{Equal, Less};
16use std::sync::Arc;
17
18use foyer::Hint;
19use risingwave_hummock_sdk::key::FullKey;
20use risingwave_hummock_sdk::sstable_info::SstableInfo;
21
22use crate::hummock::iterator::{Backward, HummockIterator, ValueMeta};
23use crate::hummock::sstable::SstableIteratorReadOptions;
24use crate::hummock::value::HummockValue;
25use crate::hummock::{
26 BlockIterator, HummockResult, SstableIteratorType, SstableStoreRef, TableHolder,
27};
28use crate::monitor::StoreLocalStatistic;
29
30pub struct BackwardSstableIterator {
32 block_iter: Option<BlockIterator>,
34
35 cur_idx: usize,
37
38 sst: TableHolder,
40
41 sstable_store: SstableStoreRef,
42
43 stats: StoreLocalStatistic,
44
45 read_block_meta_range: (usize, usize),
47}
48
49impl BackwardSstableIterator {
50 pub fn new(
51 sstable: TableHolder,
52 sstable_store: SstableStoreRef,
53 sstable_info_ref: &SstableInfo,
54 ) -> Self {
55 let mut start_idx = 0;
56 let mut end_idx = sstable.meta.block_metas.len() - 1;
57 assert!(
58 !sstable_info_ref.table_ids.is_empty(),
59 "BackwardSstableIterator: SST {} (object {}) has empty table_ids",
60 sstable_info_ref.sst_id,
61 sstable_info_ref.object_id,
62 );
63 let read_table_id_range = (
64 *sstable_info_ref.table_ids.first().unwrap(),
65 *sstable_info_ref.table_ids.last().unwrap(),
66 );
67 assert!(
68 read_table_id_range.0 <= read_table_id_range.1,
69 "invalid table id range {} - {}",
70 read_table_id_range.0,
71 read_table_id_range.1
72 );
73 let block_meta_count = sstable.meta.block_metas.len();
74 assert!(block_meta_count > 0);
75 assert!(
76 sstable.meta.block_metas[0].table_id() <= read_table_id_range.0,
77 "table id {} not found table_ids in block_meta {:?}",
78 read_table_id_range.0,
79 sstable
80 .meta
81 .block_metas
82 .iter()
83 .map(|meta| meta.table_id())
84 .collect::<Vec<_>>()
85 );
86 assert!(
87 sstable.meta.block_metas[block_meta_count - 1].table_id() >= read_table_id_range.1,
88 "table id {} not found table_ids in block_meta {:?}",
89 read_table_id_range.1,
90 sstable
91 .meta
92 .block_metas
93 .iter()
94 .map(|meta| meta.table_id())
95 .collect::<Vec<_>>()
96 );
97
98 while start_idx < block_meta_count
99 && sstable.meta.block_metas[start_idx].table_id() < read_table_id_range.0
100 {
101 start_idx += 1;
102 }
103 assert!(
105 start_idx < block_meta_count,
106 "table id {} not found table_ids in block_meta {:?}",
107 read_table_id_range.0,
108 sstable
109 .meta
110 .block_metas
111 .iter()
112 .map(|meta| meta.table_id())
113 .collect::<Vec<_>>()
114 );
115
116 while end_idx > start_idx
117 && sstable.meta.block_metas[end_idx].table_id() > read_table_id_range.1
118 {
119 end_idx -= 1;
120 }
121 assert!(
122 end_idx >= start_idx,
123 "end_idx {} < start_idx {} block_meta_count {}",
124 end_idx,
125 start_idx,
126 block_meta_count
127 );
128
129 Self {
130 block_iter: None,
131 cur_idx: end_idx,
132 sst: sstable,
133 sstable_store,
134 stats: StoreLocalStatistic::default(),
135 read_block_meta_range: (start_idx, end_idx),
136 }
137 }
138
139 async fn seek_idx(
141 &mut self,
142 idx: isize,
143 seek_key: Option<FullKey<&[u8]>>,
144 ) -> HummockResult<()> {
145 if idx >= self.sst.block_count() as isize || idx < self.read_block_meta_range.0 as isize {
146 self.block_iter = None;
147 } else {
148 let block = self
149 .sstable_store
150 .get(
151 &self.sst,
152 idx as usize,
153 crate::hummock::CachePolicy::Fill(Hint::Normal),
154 &mut self.stats,
155 )
156 .await?;
157 let mut block_iter = BlockIterator::new(block);
158 if let Some(key) = seek_key {
159 block_iter.seek_le(key);
160 } else {
161 block_iter.seek_to_last();
162 }
163
164 self.block_iter = Some(block_iter);
165 self.cur_idx = idx as usize;
166 }
167
168 Ok(())
169 }
170}
171
172impl HummockIterator for BackwardSstableIterator {
173 type Direction = Backward;
174
175 async fn next(&mut self) -> HummockResult<()> {
176 self.stats.total_key_count += 1;
177 let block_iter = self.block_iter.as_mut().expect("no block iter");
178 if block_iter.try_prev() {
179 Ok(())
180 } else {
181 self.seek_idx(self.cur_idx as isize - 1, None).await
183 }
184 }
185
186 fn key(&self) -> FullKey<&[u8]> {
187 self.block_iter.as_ref().expect("no block iter").key()
188 }
189
190 fn value(&self) -> HummockValue<&[u8]> {
191 let raw_value = self.block_iter.as_ref().expect("no block iter").value();
192
193 HummockValue::from_slice(raw_value).expect("decode error")
194 }
195
196 fn is_valid(&self) -> bool {
197 self.block_iter.as_ref().is_some_and(|i| i.is_valid())
198 }
199
200 async fn rewind(&mut self) -> HummockResult<()> {
203 self.seek_idx(self.read_block_meta_range.1 as isize, None)
204 .await
205 }
206
207 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
208 let block_idx = self
209 .sst
210 .meta
211 .block_metas
212 .partition_point(|block_meta| {
213 let ord = FullKey::decode(&block_meta.smallest_key).cmp(&key);
217 ord == Less || ord == Equal
218 })
219 .saturating_sub(1); let block_idx = block_idx as isize;
221
222 self.seek_idx(block_idx, Some(key)).await?;
223 if !self.is_valid() {
224 self.seek_idx(block_idx - 1, None).await?;
226 }
227
228 Ok(())
229 }
230
231 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
232 stats.add(&self.stats)
233 }
234
235 fn value_meta(&self) -> ValueMeta {
236 ValueMeta {
237 object_id: Some(self.sst.id),
238 block_id: Some(self.cur_idx as _),
239 }
240 }
241}
242
243impl SstableIteratorType for BackwardSstableIterator {
244 fn create(
245 sstable: TableHolder,
246 sstable_store: SstableStoreRef,
247 _: Arc<SstableIteratorReadOptions>,
248 sstable_info_ref: &SstableInfo,
249 ) -> Self {
250 BackwardSstableIterator::new(sstable, sstable_store, sstable_info_ref)
251 }
252}
253
254#[cfg(test)]
256mod tests {
257 use itertools::Itertools;
258 use rand::prelude::*;
259 use rand::rng as thread_rng;
260 use risingwave_common::catalog::TableId;
261 use risingwave_common::hash::VirtualNode;
262 use risingwave_common::util::epoch::test_epoch;
263 use risingwave_hummock_sdk::EpochWithGap;
264 use risingwave_hummock_sdk::key::UserKey;
265 use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
266
267 use super::*;
268 use crate::assert_bytes_eq;
269 use crate::hummock::iterator::test_utils::mock_sstable_store;
270 use crate::hummock::test_utils::{
271 TEST_KEYS_COUNT, default_builder_opt_for_test, gen_default_test_sstable,
272 gen_test_sstable_with_table_ids, test_key_of, test_value_of,
273 };
274
275 #[tokio::test]
276 async fn test_backward_sstable_iterator() {
277 let sstable_store = mock_sstable_store().await;
279 let (handle, sstable_info) =
280 gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
281 .await;
282 assert!(handle.meta.block_metas.len() > 10);
285 let mut sstable_iter = BackwardSstableIterator::new(handle, sstable_store, &sstable_info);
286 let mut cnt = TEST_KEYS_COUNT;
287 sstable_iter.rewind().await.unwrap();
288
289 while sstable_iter.is_valid() {
290 cnt -= 1;
291 let key = sstable_iter.key();
292 let value = sstable_iter.value();
293 assert_eq!(key, test_key_of(cnt).to_ref());
294 assert_bytes_eq!(value.into_user_value().unwrap(), test_value_of(cnt));
295 sstable_iter.next().await.unwrap();
296 }
297
298 assert_eq!(cnt, 0);
299 }
300
301 #[tokio::test]
302 async fn test_backward_sstable_seek() {
303 let sstable_store = mock_sstable_store().await;
304 let (sstable, sstable_info) =
305 gen_default_test_sstable(default_builder_opt_for_test(), 0, sstable_store.clone())
306 .await;
307 assert!(sstable.meta.block_metas.len() > 10);
310 let mut sstable_iter = BackwardSstableIterator::new(sstable, sstable_store, &sstable_info);
311 let mut all_key_to_test = (0..TEST_KEYS_COUNT).collect_vec();
312 let mut rng = thread_rng();
313 all_key_to_test.shuffle(&mut rng);
314
315 for i in all_key_to_test {
317 sstable_iter.seek(test_key_of(i).to_ref()).await.unwrap();
318 let key = sstable_iter.key();
320 assert_eq!(key, test_key_of(i).to_ref());
321 }
322
323 sstable_iter
325 .seek(test_key_of(TEST_KEYS_COUNT - 500).to_ref())
326 .await
327 .unwrap();
328 for i in (0..TEST_KEYS_COUNT - 500 + 1).rev() {
329 let key = sstable_iter.key();
330 assert_eq!(key, test_key_of(i).to_ref(), "key index:{}", i);
331 sstable_iter.next().await.unwrap();
332 }
333 assert!(!sstable_iter.is_valid());
334
335 let largest_key = FullKey::for_test(
336 TableId::default(),
337 [
338 VirtualNode::ZERO.to_be_bytes().as_slice(),
339 format!("key_zzzz_{:05}", 0).as_bytes(),
340 ]
341 .concat(),
342 test_epoch(1),
343 );
344 sstable_iter.seek(largest_key.to_ref()).await.unwrap();
345 let key = sstable_iter.key();
346 assert_eq!(key, test_key_of(TEST_KEYS_COUNT - 1).to_ref());
347
348 let smallest_key = FullKey::for_test(
350 TableId::default(),
351 [
352 VirtualNode::ZERO.to_be_bytes().as_slice(),
353 format!("key_aaaa_{:05}", 0).as_bytes(),
354 ]
355 .concat(),
356 test_epoch(1),
357 );
358 sstable_iter.seek(smallest_key.to_ref()).await.unwrap();
359 assert!(!sstable_iter.is_valid());
360
361 for idx in (1..TEST_KEYS_COUNT).rev() {
363 sstable_iter
368 .seek(
369 FullKey::for_test(
370 TableId::default(),
371 [
372 VirtualNode::ZERO.to_be_bytes().as_slice(),
373 format!("key_test_{:05}", idx * 2 - 1).as_bytes(),
374 ]
375 .concat(),
376 0,
377 )
378 .to_ref(),
379 )
380 .await
381 .unwrap();
382
383 let key = sstable_iter.key();
384 assert_eq!(key, test_key_of(idx - 1).to_ref());
385 sstable_iter.next().await.unwrap();
386 }
387 assert!(!sstable_iter.is_valid());
388 }
389
390 #[tokio::test]
391 async fn test_read_table_id_range() {
392 {
393 let sstable_store = mock_sstable_store().await;
394 let k1 = {
396 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
397 table_key.extend_from_slice(format!("key_test_{:05}", 1).as_bytes());
398 let uk = UserKey::for_test(TableId::from(1), table_key);
399 FullKey {
400 user_key: uk,
401 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
402 }
403 };
404
405 let k2 = {
406 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
407 table_key.extend_from_slice(format!("key_test_{:05}", 2).as_bytes());
408 let uk = UserKey::for_test(TableId::from(2), table_key);
409 FullKey {
410 user_key: uk,
411 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
412 }
413 };
414
415 let k3 = {
416 let mut table_key = VirtualNode::ZERO.to_be_bytes().to_vec();
417 table_key.extend_from_slice(format!("key_test_{:05}", 3).as_bytes());
418 let uk = UserKey::for_test(TableId::from(3), table_key);
419 FullKey {
420 user_key: uk,
421 epoch_with_gap: EpochWithGap::new_from_epoch(test_epoch(1)),
422 }
423 };
424
425 {
426 let kv_pairs = vec![
427 (k1.clone(), HummockValue::put(test_value_of(1))),
428 (k2.clone(), HummockValue::put(test_value_of(2))),
429 (k3.clone(), HummockValue::put(test_value_of(3))),
430 ];
431
432 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
433 default_builder_opt_for_test(),
434 10,
435 kv_pairs.into_iter(),
436 sstable_store.clone(),
437 vec![1, 2, 3],
438 )
439 .await;
440 let mut sstable_iter = BackwardSstableIterator::create(
441 sstable,
442 sstable_store.clone(),
443 Arc::new(SstableIteratorReadOptions::default()),
444 &SstableInfo::from(SstableInfoInner {
445 table_ids: vec![1.into(), 2.into(), 3.into()],
446 ..Default::default()
447 }),
448 );
449 sstable_iter.rewind().await.unwrap();
450 assert!(sstable_iter.is_valid());
451 assert!(sstable_iter.key().eq(&k3.to_ref()));
452
453 let mut cnt = 0;
454 let mut last_key = k1.clone();
455 while sstable_iter.is_valid() {
456 last_key = sstable_iter.key().to_vec();
457 cnt += 1;
458 sstable_iter.next().await.unwrap();
459 }
460
461 assert_eq!(3, cnt);
462 assert_eq!(last_key, k1.clone());
463 }
464
465 {
466 let kv_pairs = vec![
467 (k1.clone(), HummockValue::put(test_value_of(1))),
468 (k2.clone(), HummockValue::put(test_value_of(2))),
469 (k3.clone(), HummockValue::put(test_value_of(3))),
470 ];
471
472 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
473 default_builder_opt_for_test(),
474 10,
475 kv_pairs.into_iter(),
476 sstable_store.clone(),
477 vec![1, 2, 3],
478 )
479 .await;
480
481 let mut sstable_iter = BackwardSstableIterator::create(
482 sstable,
483 sstable_store.clone(),
484 Arc::new(SstableIteratorReadOptions::default()),
485 &SstableInfo::from(SstableInfoInner {
486 table_ids: vec![1.into(), 2.into()],
487 ..Default::default()
488 }),
489 );
490 sstable_iter.rewind().await.unwrap();
491 assert!(sstable_iter.is_valid());
492 assert!(sstable_iter.key().eq(&k2.to_ref()));
493
494 let mut cnt = 0;
495 let mut last_key = k1.clone();
496 while sstable_iter.is_valid() {
497 last_key = sstable_iter.key().to_vec();
498 cnt += 1;
499 sstable_iter.next().await.unwrap();
500 }
501
502 assert_eq!(2, cnt);
503 assert_eq!(last_key, k1.clone());
504 }
505
506 {
507 let kv_pairs = vec![
508 (k1.clone(), HummockValue::put(test_value_of(1))),
509 (k2.clone(), HummockValue::put(test_value_of(2))),
510 (k3.clone(), HummockValue::put(test_value_of(3))),
511 ];
512
513 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
514 default_builder_opt_for_test(),
515 10,
516 kv_pairs.into_iter(),
517 sstable_store.clone(),
518 vec![1, 2, 3],
519 )
520 .await;
521
522 let mut sstable_iter = BackwardSstableIterator::create(
523 sstable,
524 sstable_store.clone(),
525 Arc::new(SstableIteratorReadOptions::default()),
526 &SstableInfo::from(SstableInfoInner {
527 table_ids: vec![2.into(), 3.into()],
528 ..Default::default()
529 }),
530 );
531 sstable_iter.rewind().await.unwrap();
532 assert!(sstable_iter.is_valid());
533 assert!(sstable_iter.key().eq(&k3.to_ref()));
534
535 let mut cnt = 0;
536 let mut last_key = k1.clone();
537 while sstable_iter.is_valid() {
538 last_key = sstable_iter.key().to_vec();
539 cnt += 1;
540 sstable_iter.next().await.unwrap();
541 }
542
543 assert_eq!(2, cnt);
544 assert_eq!(last_key, k2.clone());
545 }
546
547 {
548 let kv_pairs = vec![
549 (k1.clone(), HummockValue::put(test_value_of(1))),
550 (k2.clone(), HummockValue::put(test_value_of(2))),
551 (k3.clone(), HummockValue::put(test_value_of(3))),
552 ];
553
554 let (sstable, _sstable_info) = gen_test_sstable_with_table_ids(
555 default_builder_opt_for_test(),
556 10,
557 kv_pairs.into_iter(),
558 sstable_store.clone(),
559 vec![1, 2, 3],
560 )
561 .await;
562
563 let mut sstable_iter = BackwardSstableIterator::create(
564 sstable,
565 sstable_store.clone(),
566 Arc::new(SstableIteratorReadOptions::default()),
567 &SstableInfo::from(SstableInfoInner {
568 table_ids: vec![2.into()],
569 ..Default::default()
570 }),
571 );
572 sstable_iter.rewind().await.unwrap();
573 assert!(sstable_iter.is_valid());
574 assert!(sstable_iter.key().eq(&k2.to_ref()));
575
576 let mut cnt = 0;
577 let mut last_key = k1.clone();
578 while sstable_iter.is_valid() {
579 last_key = sstable_iter.key().to_vec();
580 cnt += 1;
581 sstable_iter.next().await.unwrap();
582 }
583
584 assert_eq!(1, cnt);
585 assert_eq!(last_key, k2.clone());
586 }
587 }
588 }
589}