risingwave_storage/hummock/local_version/
pinned_version.rs1use std::collections::{BTreeMap, HashMap};
16use std::iter::empty;
17use std::ops::Deref;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use auto_enums::auto_enum;
22use parking_lot::RwLock;
23use risingwave_common::catalog::TableId;
24use risingwave_hummock_sdk::change_log::TableChangeLogCommon;
25use risingwave_hummock_sdk::level::{Level, Levels};
26use risingwave_hummock_sdk::sstable_info::SstableInfo;
27use risingwave_hummock_sdk::version::{HummockVersion, LocalHummockVersion};
28use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID};
29use risingwave_rpc_client::HummockMetaClient;
30use thiserror_ext::AsReport;
31use tokio::sync::mpsc::error::TryRecvError;
32use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
33use tokio_retry::strategy::jitter;
34
35#[derive(Debug, Clone)]
36pub enum PinVersionAction {
37 Pin(HummockVersionId),
38 Unpin(HummockVersionId),
39}
40
41struct PinnedVersionGuard {
42 version_id: HummockVersionId,
43 pinned_version_manager_tx: UnboundedSender<PinVersionAction>,
44}
45
46impl PinnedVersionGuard {
47 fn new(
49 version_id: HummockVersionId,
50 pinned_version_manager_tx: UnboundedSender<PinVersionAction>,
51 ) -> Self {
52 if pinned_version_manager_tx
53 .send(PinVersionAction::Pin(version_id))
54 .is_err()
55 {
56 tracing::warn!("failed to send req pin version id{}", version_id);
57 }
58
59 Self {
60 version_id,
61 pinned_version_manager_tx,
62 }
63 }
64}
65
66impl Drop for PinnedVersionGuard {
67 fn drop(&mut self) {
68 if self
69 .pinned_version_manager_tx
70 .send(PinVersionAction::Unpin(self.version_id))
71 .is_err()
72 {
73 tracing::warn!("failed to send req unpin version id: {}", self.version_id);
74 }
75 }
76}
77
78#[derive(Clone)]
79pub struct PinnedVersion {
80 version: Arc<LocalHummockVersion>,
81 guard: Arc<PinnedVersionGuard>,
82 table_change_log: Arc<RwLock<HashMap<TableId, TableChangeLogCommon<SstableInfo>>>>,
83}
84
85impl Deref for PinnedVersion {
86 type Target = LocalHummockVersion;
87
88 fn deref(&self) -> &Self::Target {
89 &self.version
90 }
91}
92
93impl PinnedVersion {
94 pub fn new(
95 version: HummockVersion,
96 pinned_version_manager_tx: UnboundedSender<PinVersionAction>,
97 ) -> Self {
98 let version_id = version.id;
99 let (local_version, table_id_to_change_logs) = version.split_change_log();
100 PinnedVersion {
101 guard: Arc::new(PinnedVersionGuard::new(
102 version_id,
103 pinned_version_manager_tx,
104 )),
105 table_change_log: Arc::new(RwLock::new(table_id_to_change_logs)),
106 version: Arc::new(local_version),
107 }
108 }
109
110 pub fn new_pin_version(&self, version: HummockVersion) -> Option<Self> {
111 assert!(
112 version.id >= self.version.id,
113 "pinning a older version {}. Current is {}",
114 version.id,
115 self.version.id
116 );
117 if version.id == self.version.id {
118 return None;
119 }
120 let version_id = version.id;
121 let (local_version, table_id_to_change_logs) = version.split_change_log();
122 Some(PinnedVersion {
123 guard: Arc::new(PinnedVersionGuard::new(
124 version_id,
125 self.guard.pinned_version_manager_tx.clone(),
126 )),
127 table_change_log: Arc::new(RwLock::new(table_id_to_change_logs)),
128 version: Arc::new(local_version),
129 })
130 }
131
132 pub fn new_with_local_version(&self, version: LocalHummockVersion) -> Option<Self> {
134 assert!(
135 version.id >= self.version.id,
136 "pinning a older version {}. Current is {}",
137 version.id,
138 self.version.id
139 );
140 if version.id == self.version.id {
141 return None;
142 }
143
144 let version_id = version.id;
145
146 Some(PinnedVersion {
147 guard: Arc::new(PinnedVersionGuard::new(
148 version_id,
149 self.guard.pinned_version_manager_tx.clone(),
150 )),
151 table_change_log: self.table_change_log.clone(),
152 version: Arc::new(version),
153 })
154 }
155
156 pub fn id(&self) -> HummockVersionId {
157 self.version.id
158 }
159
160 pub fn is_valid(&self) -> bool {
161 self.version.id != INVALID_VERSION_ID
162 }
163
164 fn levels_by_compaction_groups_id(&self, compaction_group_id: CompactionGroupId) -> &Levels {
165 self.version
166 .levels
167 .get(&compaction_group_id)
168 .unwrap_or_else(|| {
169 panic!(
170 "levels for compaction group {} not found in version {}",
171 compaction_group_id,
172 self.id()
173 )
174 })
175 }
176
177 pub fn levels(&self, table_id: TableId) -> impl Iterator<Item = &Level> {
178 #[auto_enum(Iterator)]
179 match self.version.state_table_info.info().get(&table_id) {
180 Some(info) => {
181 let compaction_group_id = info.compaction_group_id;
182 let levels = self.levels_by_compaction_groups_id(compaction_group_id);
183 levels
184 .l0
185 .sub_levels
186 .iter()
187 .rev()
188 .chain(levels.levels.iter())
189 }
190 None => empty(),
191 }
192 }
193
194 pub fn table_change_log_read_lock(
195 &self,
196 ) -> parking_lot::RwLockReadGuard<'_, HashMap<TableId, TableChangeLogCommon<SstableInfo>>> {
197 self.table_change_log.read()
198 }
199
200 pub fn table_change_log_write_lock(
201 &self,
202 ) -> parking_lot::RwLockWriteGuard<'_, HashMap<TableId, TableChangeLogCommon<SstableInfo>>>
203 {
204 self.table_change_log.write()
205 }
206}
207
208pub(crate) async fn start_pinned_version_worker(
209 mut rx: UnboundedReceiver<PinVersionAction>,
210 hummock_meta_client: Arc<dyn HummockMetaClient>,
211 max_version_pinning_duration_sec: u64,
212) {
213 let min_execute_interval = Duration::from_millis(1000);
214 let max_retry_interval = Duration::from_secs(10);
215 let get_backoff_strategy = || {
216 tokio_retry::strategy::ExponentialBackoff::from_millis(10)
217 .max_delay(max_retry_interval)
218 .map(jitter)
219 };
220 let mut retry_backoff = get_backoff_strategy();
221 let mut min_execute_interval_tick = tokio::time::interval(min_execute_interval);
222 min_execute_interval_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
223 let mut need_unpin = false;
224
225 let mut version_ids_in_use: BTreeMap<HummockVersionId, (usize, Instant)> = BTreeMap::new();
226 let max_version_pinning_duration_sec = Duration::from_secs(max_version_pinning_duration_sec);
227 loop {
229 min_execute_interval_tick.tick().await;
230 while version_ids_in_use.len() > 1
232 && let Some(e) = version_ids_in_use.first_entry()
233 {
234 if e.get().1.elapsed() < max_version_pinning_duration_sec {
235 break;
236 }
237 need_unpin = true;
238 e.remove();
239 }
240
241 let mut versions_to_unpin = vec![];
243 let inst = Instant::now();
244 'collect: loop {
245 match rx.try_recv() {
246 Ok(version_action) => match version_action {
247 PinVersionAction::Pin(version_id) => {
248 version_ids_in_use
249 .entry(version_id)
250 .and_modify(|e| {
251 e.0 += 1;
252 e.1 = inst;
253 })
254 .or_insert((1, inst));
255 }
256 PinVersionAction::Unpin(version_id) => {
257 versions_to_unpin.push(version_id);
258 }
259 },
260 Err(err) => match err {
261 TryRecvError::Empty => {
262 break 'collect;
263 }
264 TryRecvError::Disconnected => {
265 tracing::info!("Shutdown hummock unpin worker");
266 return;
267 }
268 },
269 }
270 }
271 if !versions_to_unpin.is_empty() {
272 need_unpin = true;
273 }
274 if !need_unpin {
275 continue;
276 }
277
278 for version in &versions_to_unpin {
279 match version_ids_in_use.get_mut(version) {
280 Some((counter, _)) => {
281 *counter -= 1;
282 if *counter == 0 {
283 version_ids_in_use.remove(version);
284 }
285 }
286 None => tracing::warn!(
287 "version {} to unpin does not exist, may already be unpinned due to expiration",
288 version
289 ),
290 }
291 }
292
293 match version_ids_in_use.first_entry() {
294 Some(unpin_before) => {
295 match hummock_meta_client
297 .unpin_version_before(*unpin_before.key())
298 .await
299 {
300 Ok(_) => {
301 versions_to_unpin.clear();
302 need_unpin = false;
303 retry_backoff = get_backoff_strategy();
304 }
305 Err(err) => {
306 let retry_after = retry_backoff.next().unwrap_or(max_retry_interval);
307 tracing::warn!(
308 error = %err.as_report(),
309 "Failed to unpin version. Will retry after about {} milliseconds",
310 retry_after.as_millis()
311 );
312 tokio::time::sleep(retry_after).await;
313 }
314 }
315 }
316 None => tracing::warn!("version_ids_in_use is empty!"),
317 }
318 }
319}