risingwave_storage/hummock/iterator/
concat_inner.rs1use std::cmp::Ordering::{Equal, Greater, Less};
16use std::sync::Arc;
17
18use risingwave_hummock_sdk::key::FullKey;
19use risingwave_hummock_sdk::sstable_info::SstableInfo;
20
21use crate::hummock::iterator::{
22 DirectionEnum, HummockIterator, HummockIteratorDirection, ValueMeta,
23};
24use crate::hummock::sstable::SstableIteratorReadOptions;
25use crate::hummock::value::HummockValue;
26use crate::hummock::{HummockResult, SstableIteratorType, SstableStoreRef};
27use crate::monitor::StoreLocalStatistic;
28
29fn smallest_key(sstable_info: &SstableInfo) -> &[u8] {
30 &sstable_info.key_range.left
31}
32
33fn largest_key(sstable_info: &SstableInfo) -> &[u8] {
34 &sstable_info.key_range.right
35}
36
37pub struct ConcatIteratorInner<TI: SstableIteratorType> {
39 sstable_iter: Option<TI>,
41
42 cur_idx: usize,
44
45 sstable_infos: Vec<SstableInfo>,
47
48 sstable_store: SstableStoreRef,
49
50 stats: StoreLocalStatistic,
51 read_options: Arc<SstableIteratorReadOptions>,
52}
53
54impl<TI: SstableIteratorType> ConcatIteratorInner<TI> {
55 pub fn new(
59 sstable_infos: Vec<SstableInfo>,
60 sstable_store: SstableStoreRef,
61 read_options: Arc<SstableIteratorReadOptions>,
62 ) -> Self {
63 Self {
64 sstable_iter: None,
65 cur_idx: 0,
66 sstable_infos,
67 sstable_store,
68 stats: StoreLocalStatistic::default(),
69 read_options,
70 }
71 }
72
73 async fn seek_idx(
75 &mut self,
76 idx: usize,
77 seek_key: Option<FullKey<&[u8]>>,
78 ) -> HummockResult<()> {
79 if idx >= self.sstable_infos.len() {
80 if let Some(old_iter) = self.sstable_iter.take() {
81 old_iter.collect_local_statistic(&mut self.stats);
82 }
83 self.cur_idx = self.sstable_infos.len();
84 } else {
85 let table = self
86 .sstable_store
87 .sstable(&self.sstable_infos[idx], &mut self.stats)
88 .await?;
89 let mut sstable_iter = TI::create(
90 table,
91 self.sstable_store.clone(),
92 self.read_options.clone(),
93 &self.sstable_infos[idx],
94 );
95
96 if let Some(key) = seek_key {
97 sstable_iter.seek(key).await?;
98 } else {
99 sstable_iter.rewind().await?;
100 }
101
102 if let Some(old_iter) = self.sstable_iter.take() {
103 old_iter.collect_local_statistic(&mut self.stats);
104 }
105
106 self.sstable_iter = Some(sstable_iter);
107 self.cur_idx = idx;
108 }
109 Ok(())
110 }
111}
112
113impl<TI: SstableIteratorType> HummockIterator for ConcatIteratorInner<TI> {
114 type Direction = TI::Direction;
115
116 async fn next(&mut self) -> HummockResult<()> {
117 let sstable_iter = self.sstable_iter.as_mut().expect("no table iter");
118 sstable_iter.next().await?;
119
120 if sstable_iter.is_valid() {
121 Ok(())
122 } else {
123 let mut table_idx = self.cur_idx + 1;
125 while !self.is_valid() && table_idx < self.sstable_infos.len() {
126 self.seek_idx(table_idx, None).await?;
127 table_idx += 1;
128 }
129 Ok(())
130 }
131 }
132
133 fn key(&self) -> FullKey<&[u8]> {
134 self.sstable_iter.as_ref().expect("no table iter").key()
135 }
136
137 fn value(&self) -> HummockValue<&[u8]> {
138 self.sstable_iter.as_ref().expect("no table iter").value()
139 }
140
141 fn is_valid(&self) -> bool {
142 self.sstable_iter.as_ref().is_some_and(|i| i.is_valid())
143 }
144
145 async fn rewind(&mut self) -> HummockResult<()> {
146 self.seek_idx(0, None).await?;
147 let mut table_idx = 1;
148 while !self.is_valid() && table_idx < self.sstable_infos.len() {
149 self.seek_idx(table_idx, None).await?;
151 table_idx += 1;
152 }
153 Ok(())
154 }
155
156 async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
157 let mut table_idx = self
158 .sstable_infos
159 .partition_point(|table| match Self::Direction::direction() {
160 DirectionEnum::Forward => {
161 let ord = FullKey::decode(smallest_key(table)).cmp(&key);
162
163 ord == Less || ord == Equal
164 }
165 DirectionEnum::Backward => {
166 let ord = FullKey::decode(largest_key(table)).cmp(&key);
167 ord == Greater || (ord == Equal && !table.key_range.right_exclusive)
168 }
169 })
170 .saturating_sub(1); self.seek_idx(table_idx, Some(key)).await?;
173 table_idx += 1;
174 while !self.is_valid() && table_idx < self.sstable_infos.len() {
175 self.seek_idx(table_idx, None).await?;
177 table_idx += 1;
178 }
179 Ok(())
180 }
181
182 fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
183 stats.add(&self.stats);
184 if let Some(iter) = &self.sstable_iter {
185 iter.collect_local_statistic(stats);
186 }
187 }
188
189 fn value_meta(&self) -> ValueMeta {
190 self.sstable_iter
191 .as_ref()
192 .expect("no table iter")
193 .value_meta()
194 }
195}