risingwave_storage/hummock/local_version/
pinned_version.rs1use std::collections::{BTreeMap, HashMap};
16use std::iter::empty;
17use std::ops::Deref;
18use std::sync::{Arc, LazyLock};
19use std::time::{Duration, Instant};
20
21use auto_enums::auto_enum;
22use parking_lot::RwLock;
23use risingwave_common::catalog::TableId;
24use risingwave_common::log::LogSuppressor;
25use risingwave_hummock_sdk::change_log::TableChangeLogCommon;
26use risingwave_hummock_sdk::level::{Level, Levels};
27use risingwave_hummock_sdk::sstable_info::SstableInfo;
28use risingwave_hummock_sdk::version::{HummockVersion, LocalHummockVersion};
29use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID};
30use risingwave_rpc_client::HummockMetaClient;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc::error::TryRecvError;
33use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
34use tokio_retry::strategy::jitter;
35
36#[derive(Debug, Clone)]
37pub enum PinVersionAction {
38 Pin(HummockVersionId),
39 Unpin(HummockVersionId),
40}
41
42struct PinnedVersionGuard {
43 version_id: HummockVersionId,
44 pinned_version_manager_tx: UnboundedSender<PinVersionAction>,
45}
46
47impl PinnedVersionGuard {
48 fn new(
50 version_id: HummockVersionId,
51 pinned_version_manager_tx: UnboundedSender<PinVersionAction>,
52 ) -> Self {
53 if pinned_version_manager_tx
54 .send(PinVersionAction::Pin(version_id))
55 .is_err()
56 {
57 tracing::warn!("failed to send req pin version id{}", version_id);
58 }
59
60 Self {
61 version_id,
62 pinned_version_manager_tx,
63 }
64 }
65}
66
67impl Drop for PinnedVersionGuard {
68 fn drop(&mut self) {
69 if self
70 .pinned_version_manager_tx
71 .send(PinVersionAction::Unpin(self.version_id))
72 .is_err()
73 {
74 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
75 LazyLock::new(|| LogSuppressor::per_second(1));
76 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
77 tracing::warn!(
78 suppressed_count,
79 version_id = %self.version_id,
80 "failed to send req unpin"
81 );
82 }
83 }
84 }
85}
86
87#[derive(Clone)]
88pub struct PinnedVersion {
89 version: Arc<LocalHummockVersion>,
90 guard: Arc<PinnedVersionGuard>,
91 table_change_log: Arc<RwLock<HashMap<TableId, TableChangeLogCommon<SstableInfo>>>>,
92}
93
94impl Deref for PinnedVersion {
95 type Target = LocalHummockVersion;
96
97 fn deref(&self) -> &Self::Target {
98 &self.version
99 }
100}
101
102impl PinnedVersion {
103 pub fn new(
104 version: HummockVersion,
105 pinned_version_manager_tx: UnboundedSender<PinVersionAction>,
106 ) -> Self {
107 let version_id = version.id;
108 let (local_version, table_id_to_change_logs) = version.split_change_log();
109 PinnedVersion {
110 guard: Arc::new(PinnedVersionGuard::new(
111 version_id,
112 pinned_version_manager_tx,
113 )),
114 table_change_log: Arc::new(RwLock::new(table_id_to_change_logs)),
115 version: Arc::new(local_version),
116 }
117 }
118
119 pub fn new_pin_version(&self, version: HummockVersion) -> Option<Self> {
120 assert!(
121 version.id >= self.version.id,
122 "pinning a older version {}. Current is {}",
123 version.id,
124 self.version.id
125 );
126 if version.id == self.version.id {
127 return None;
128 }
129 let version_id = version.id;
130 let (local_version, table_id_to_change_logs) = version.split_change_log();
131 Some(PinnedVersion {
132 guard: Arc::new(PinnedVersionGuard::new(
133 version_id,
134 self.guard.pinned_version_manager_tx.clone(),
135 )),
136 table_change_log: Arc::new(RwLock::new(table_id_to_change_logs)),
137 version: Arc::new(local_version),
138 })
139 }
140
141 pub fn new_with_local_version(&self, version: LocalHummockVersion) -> Option<Self> {
143 assert!(
144 version.id >= self.version.id,
145 "pinning a older version {}. Current is {}",
146 version.id,
147 self.version.id
148 );
149 if version.id == self.version.id {
150 return None;
151 }
152
153 let version_id = version.id;
154
155 Some(PinnedVersion {
156 guard: Arc::new(PinnedVersionGuard::new(
157 version_id,
158 self.guard.pinned_version_manager_tx.clone(),
159 )),
160 table_change_log: self.table_change_log.clone(),
161 version: Arc::new(version),
162 })
163 }
164
165 pub fn id(&self) -> HummockVersionId {
166 self.version.id
167 }
168
169 pub fn is_valid(&self) -> bool {
170 self.version.id != INVALID_VERSION_ID
171 }
172
173 fn levels_by_compaction_groups_id(&self, compaction_group_id: CompactionGroupId) -> &Levels {
174 self.version
175 .levels
176 .get(&compaction_group_id)
177 .unwrap_or_else(|| {
178 panic!(
179 "levels for compaction group {} not found in version {}",
180 compaction_group_id,
181 self.id()
182 )
183 })
184 }
185
186 pub fn levels(&self, table_id: TableId) -> impl Iterator<Item = &Level> {
187 #[auto_enum(Iterator)]
188 match self.version.state_table_info.info().get(&table_id) {
189 Some(info) => {
190 let compaction_group_id = info.compaction_group_id;
191 let levels = self.levels_by_compaction_groups_id(compaction_group_id);
192 levels
193 .l0
194 .sub_levels
195 .iter()
196 .rev()
197 .chain(levels.levels.iter())
198 }
199 None => empty(),
200 }
201 }
202
203 pub fn table_change_log_read_lock(
204 &self,
205 ) -> parking_lot::RwLockReadGuard<'_, HashMap<TableId, TableChangeLogCommon<SstableInfo>>> {
206 self.table_change_log.read()
207 }
208
209 pub fn table_change_log_write_lock(
210 &self,
211 ) -> parking_lot::RwLockWriteGuard<'_, HashMap<TableId, TableChangeLogCommon<SstableInfo>>>
212 {
213 self.table_change_log.write()
214 }
215}
216
217pub(crate) async fn start_pinned_version_worker(
218 mut rx: UnboundedReceiver<PinVersionAction>,
219 hummock_meta_client: Arc<dyn HummockMetaClient>,
220 max_version_pinning_duration_sec: u64,
221) {
222 let min_execute_interval = Duration::from_millis(1000);
223 let max_retry_interval = Duration::from_secs(10);
224 let get_backoff_strategy = || {
225 tokio_retry::strategy::ExponentialBackoff::from_millis(10)
226 .max_delay(max_retry_interval)
227 .map(jitter)
228 };
229 let mut retry_backoff = get_backoff_strategy();
230 let mut min_execute_interval_tick = tokio::time::interval(min_execute_interval);
231 min_execute_interval_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
232 let mut need_unpin = false;
233
234 let mut version_ids_in_use: BTreeMap<HummockVersionId, (usize, Instant)> = BTreeMap::new();
235 let max_version_pinning_duration_sec = Duration::from_secs(max_version_pinning_duration_sec);
236 loop {
238 min_execute_interval_tick.tick().await;
239 while version_ids_in_use.len() > 1
241 && let Some(e) = version_ids_in_use.first_entry()
242 {
243 if e.get().1.elapsed() < max_version_pinning_duration_sec {
244 break;
245 }
246 need_unpin = true;
247 e.remove();
248 }
249
250 let mut versions_to_unpin = vec![];
252 let inst = Instant::now();
253 'collect: loop {
254 match rx.try_recv() {
255 Ok(version_action) => match version_action {
256 PinVersionAction::Pin(version_id) => {
257 version_ids_in_use
258 .entry(version_id)
259 .and_modify(|e| {
260 e.0 += 1;
261 e.1 = inst;
262 })
263 .or_insert((1, inst));
264 }
265 PinVersionAction::Unpin(version_id) => {
266 versions_to_unpin.push(version_id);
267 }
268 },
269 Err(err) => match err {
270 TryRecvError::Empty => {
271 break 'collect;
272 }
273 TryRecvError::Disconnected => {
274 tracing::info!("Shutdown hummock unpin worker");
275 return;
276 }
277 },
278 }
279 }
280 if !versions_to_unpin.is_empty() {
281 need_unpin = true;
282 }
283 if !need_unpin {
284 continue;
285 }
286
287 for version in &versions_to_unpin {
288 match version_ids_in_use.get_mut(version) {
289 Some((counter, _)) => {
290 *counter -= 1;
291 if *counter == 0 {
292 version_ids_in_use.remove(version);
293 }
294 }
295 None => tracing::warn!(
296 "version {} to unpin does not exist, may already be unpinned due to expiration",
297 version
298 ),
299 }
300 }
301
302 match version_ids_in_use.first_entry() {
303 Some(unpin_before) => {
304 match hummock_meta_client
306 .unpin_version_before(*unpin_before.key())
307 .await
308 {
309 Ok(_) => {
310 versions_to_unpin.clear();
311 need_unpin = false;
312 retry_backoff = get_backoff_strategy();
313 }
314 Err(err) => {
315 let retry_after = retry_backoff.next().unwrap_or(max_retry_interval);
316 tracing::warn!(
317 error = %err.as_report(),
318 "Failed to unpin version. Will retry after about {} milliseconds",
319 retry_after.as_millis()
320 );
321 tokio::time::sleep(retry_after).await;
322 }
323 }
324 }
325 None => tracing::warn!("version_ids_in_use is empty!"),
326 }
327 }
328}