risingwave_storage/hummock/local_version/
pinned_version.rs1use std::collections::BTreeMap;
16use std::iter::empty;
17use std::ops::Deref;
18use std::sync::{Arc, LazyLock};
19use std::time::{Duration, Instant};
20
21use auto_enums::auto_enum;
22use risingwave_common::catalog::TableId;
23use risingwave_common::log::LogSuppressor;
24use risingwave_hummock_sdk::level::{Level, Levels};
25use risingwave_hummock_sdk::version::{HummockVersion, LocalHummockVersion};
26use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID};
27use risingwave_rpc_client::HummockMetaClient;
28use thiserror_ext::AsReport;
29use tokio::sync::mpsc::error::TryRecvError;
30use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
31use tokio_retry::strategy::jitter;
32
33#[derive(Debug, Clone)]
34pub enum PinVersionAction {
35 Pin(HummockVersionId),
36 Unpin(HummockVersionId),
37}
38
39struct PinnedVersionGuard {
40 version_id: HummockVersionId,
41 pinned_version_manager_tx: UnboundedSender<PinVersionAction>,
42}
43
44impl PinnedVersionGuard {
45 fn new(
47 version_id: HummockVersionId,
48 pinned_version_manager_tx: UnboundedSender<PinVersionAction>,
49 ) -> Self {
50 if pinned_version_manager_tx
51 .send(PinVersionAction::Pin(version_id))
52 .is_err()
53 {
54 tracing::warn!("failed to send req pin version id{}", version_id);
55 }
56
57 Self {
58 version_id,
59 pinned_version_manager_tx,
60 }
61 }
62}
63
64impl Drop for PinnedVersionGuard {
65 fn drop(&mut self) {
66 if self
67 .pinned_version_manager_tx
68 .send(PinVersionAction::Unpin(self.version_id))
69 .is_err()
70 {
71 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
72 LazyLock::new(|| LogSuppressor::per_second(1));
73 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
74 tracing::warn!(
75 suppressed_count,
76 version_id = %self.version_id,
77 "failed to send req unpin"
78 );
79 }
80 }
81 }
82}
83
84#[derive(Clone)]
85pub struct PinnedVersion {
86 version: Arc<LocalHummockVersion>,
87 guard: Arc<PinnedVersionGuard>,
88}
89
90impl Deref for PinnedVersion {
91 type Target = LocalHummockVersion;
92
93 fn deref(&self) -> &Self::Target {
94 &self.version
95 }
96}
97
98impl PinnedVersion {
99 pub fn new(
100 version: HummockVersion,
101 pinned_version_manager_tx: UnboundedSender<PinVersionAction>,
102 ) -> Self {
103 let version_id = version.id;
104 let local_version = LocalHummockVersion::from(version);
105 PinnedVersion {
106 guard: Arc::new(PinnedVersionGuard::new(
107 version_id,
108 pinned_version_manager_tx,
109 )),
110 version: Arc::new(local_version),
111 }
112 }
113
114 pub fn new_pin_version(&self, version: HummockVersion) -> Option<Self> {
115 assert!(
116 version.id >= self.version.id,
117 "pinning a older version {}. Current is {}",
118 version.id,
119 self.version.id
120 );
121 if version.id == self.version.id {
122 return None;
123 }
124 let version_id = version.id;
125 let local_version = LocalHummockVersion::from(version);
126 Some(PinnedVersion {
127 guard: Arc::new(PinnedVersionGuard::new(
128 version_id,
129 self.guard.pinned_version_manager_tx.clone(),
130 )),
131 version: Arc::new(local_version),
132 })
133 }
134
135 pub fn new_with_local_version(&self, version: LocalHummockVersion) -> Option<Self> {
137 assert!(
138 version.id >= self.version.id,
139 "pinning a older version {}. Current is {}",
140 version.id,
141 self.version.id
142 );
143 if version.id == self.version.id {
144 return None;
145 }
146
147 let version_id = version.id;
148
149 Some(PinnedVersion {
150 guard: Arc::new(PinnedVersionGuard::new(
151 version_id,
152 self.guard.pinned_version_manager_tx.clone(),
153 )),
154 version: Arc::new(version),
155 })
156 }
157
158 pub fn id(&self) -> HummockVersionId {
159 self.version.id
160 }
161
162 pub fn is_valid(&self) -> bool {
163 self.version.id != INVALID_VERSION_ID
164 }
165
166 fn levels_by_compaction_groups_id(&self, compaction_group_id: CompactionGroupId) -> &Levels {
167 self.version
168 .levels
169 .get(&compaction_group_id)
170 .unwrap_or_else(|| {
171 panic!(
172 "levels for compaction group {} not found in version {}",
173 compaction_group_id,
174 self.id()
175 )
176 })
177 }
178
179 pub fn levels(&self, table_id: TableId) -> impl Iterator<Item = &Level> {
180 #[auto_enum(Iterator)]
181 match self.version.state_table_info.info().get(&table_id) {
182 Some(info) => {
183 let compaction_group_id = info.compaction_group_id;
184 let levels = self.levels_by_compaction_groups_id(compaction_group_id);
185 levels
186 .l0
187 .sub_levels
188 .iter()
189 .rev()
190 .chain(levels.levels.iter())
191 }
192 None => empty(),
193 }
194 }
195}
196
197pub(crate) async fn start_pinned_version_worker(
198 mut rx: UnboundedReceiver<PinVersionAction>,
199 hummock_meta_client: Arc<dyn HummockMetaClient>,
200 max_version_pinning_duration_sec: u64,
201) {
202 let min_execute_interval = Duration::from_millis(1000);
203 let max_retry_interval = Duration::from_secs(10);
204 let get_backoff_strategy = || {
205 tokio_retry::strategy::ExponentialBackoff::from_millis(10)
206 .max_delay(max_retry_interval)
207 .map(jitter)
208 };
209 let mut retry_backoff = get_backoff_strategy();
210 let mut min_execute_interval_tick = tokio::time::interval(min_execute_interval);
211 min_execute_interval_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
212 let mut need_unpin = false;
213
214 let mut version_ids_in_use: BTreeMap<HummockVersionId, (usize, Instant)> = BTreeMap::new();
215 let max_version_pinning_duration_sec = Duration::from_secs(max_version_pinning_duration_sec);
216 loop {
218 min_execute_interval_tick.tick().await;
219 while version_ids_in_use.len() > 1
221 && let Some(e) = version_ids_in_use.first_entry()
222 {
223 if e.get().1.elapsed() < max_version_pinning_duration_sec {
224 break;
225 }
226 need_unpin = true;
227 e.remove();
228 }
229
230 let mut versions_to_unpin = vec![];
232 let inst = Instant::now();
233 'collect: loop {
234 match rx.try_recv() {
235 Ok(version_action) => match version_action {
236 PinVersionAction::Pin(version_id) => {
237 version_ids_in_use
238 .entry(version_id)
239 .and_modify(|e| {
240 e.0 += 1;
241 e.1 = inst;
242 })
243 .or_insert((1, inst));
244 }
245 PinVersionAction::Unpin(version_id) => {
246 versions_to_unpin.push(version_id);
247 }
248 },
249 Err(err) => match err {
250 TryRecvError::Empty => {
251 break 'collect;
252 }
253 TryRecvError::Disconnected => {
254 tracing::info!("Shutdown hummock unpin worker");
255 return;
256 }
257 },
258 }
259 }
260 if !versions_to_unpin.is_empty() {
261 need_unpin = true;
262 }
263 if !need_unpin {
264 continue;
265 }
266
267 for version in &versions_to_unpin {
268 match version_ids_in_use.get_mut(version) {
269 Some((counter, _)) => {
270 *counter -= 1;
271 if *counter == 0 {
272 version_ids_in_use.remove(version);
273 }
274 }
275 None => tracing::warn!(
276 "version {} to unpin does not exist, may already be unpinned due to expiration",
277 version
278 ),
279 }
280 }
281
282 match version_ids_in_use.first_entry() {
283 Some(unpin_before) => {
284 match hummock_meta_client
286 .unpin_version_before(*unpin_before.key())
287 .await
288 {
289 Ok(_) => {
290 versions_to_unpin.clear();
291 need_unpin = false;
292 retry_backoff = get_backoff_strategy();
293 }
294 Err(err) => {
295 let retry_after = retry_backoff.next().unwrap_or(max_retry_interval);
296 tracing::warn!(
297 error = %err.as_report(),
298 "Failed to unpin version. Will retry after about {} milliseconds",
299 retry_after.as_millis()
300 );
301 tokio::time::sleep(retry_after).await;
302 }
303 }
304 }
305 None => tracing::warn!("version_ids_in_use is empty!"),
306 }
307 }
308}