risingwave_storage/hummock/local_version/
pinned_version.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::iter::empty;
17use std::ops::Deref;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use auto_enums::auto_enum;
22use parking_lot::RwLock;
23use risingwave_common::catalog::TableId;
24use risingwave_hummock_sdk::change_log::TableChangeLogCommon;
25use risingwave_hummock_sdk::level::{Level, Levels};
26use risingwave_hummock_sdk::sstable_info::SstableInfo;
27use risingwave_hummock_sdk::version::{HummockVersion, LocalHummockVersion};
28use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID};
29use risingwave_rpc_client::HummockMetaClient;
30use thiserror_ext::AsReport;
31use tokio::sync::mpsc::error::TryRecvError;
32use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
33use tokio_retry::strategy::jitter;
34
35#[derive(Debug, Clone)]
36pub enum PinVersionAction {
37    Pin(HummockVersionId),
38    Unpin(HummockVersionId),
39}
40
41struct PinnedVersionGuard {
42    version_id: HummockVersionId,
43    pinned_version_manager_tx: UnboundedSender<PinVersionAction>,
44}
45
46impl PinnedVersionGuard {
47    /// Creates a new `PinnedVersionGuard` and send a pin request to `pinned_version_worker`.
48    fn new(
49        version_id: HummockVersionId,
50        pinned_version_manager_tx: UnboundedSender<PinVersionAction>,
51    ) -> Self {
52        if pinned_version_manager_tx
53            .send(PinVersionAction::Pin(version_id))
54            .is_err()
55        {
56            tracing::warn!("failed to send req pin version id{}", version_id);
57        }
58
59        Self {
60            version_id,
61            pinned_version_manager_tx,
62        }
63    }
64}
65
66impl Drop for PinnedVersionGuard {
67    fn drop(&mut self) {
68        if self
69            .pinned_version_manager_tx
70            .send(PinVersionAction::Unpin(self.version_id))
71            .is_err()
72        {
73            tracing::warn!("failed to send req unpin version id: {}", self.version_id);
74        }
75    }
76}
77
78#[derive(Clone)]
79pub struct PinnedVersion {
80    version: Arc<LocalHummockVersion>,
81    guard: Arc<PinnedVersionGuard>,
82    table_change_log: Arc<RwLock<HashMap<TableId, TableChangeLogCommon<SstableInfo>>>>,
83}
84
85impl Deref for PinnedVersion {
86    type Target = LocalHummockVersion;
87
88    fn deref(&self) -> &Self::Target {
89        &self.version
90    }
91}
92
93impl PinnedVersion {
94    pub fn new(
95        version: HummockVersion,
96        pinned_version_manager_tx: UnboundedSender<PinVersionAction>,
97    ) -> Self {
98        let version_id = version.id;
99        let (local_version, table_id_to_change_logs) = version.split_change_log();
100        PinnedVersion {
101            guard: Arc::new(PinnedVersionGuard::new(
102                version_id,
103                pinned_version_manager_tx,
104            )),
105            table_change_log: Arc::new(RwLock::new(table_id_to_change_logs)),
106            version: Arc::new(local_version),
107        }
108    }
109
110    pub fn new_pin_version(&self, version: HummockVersion) -> Option<Self> {
111        assert!(
112            version.id >= self.version.id,
113            "pinning a older version {}. Current is {}",
114            version.id,
115            self.version.id
116        );
117        if version.id == self.version.id {
118            return None;
119        }
120        let version_id = version.id;
121        let (local_version, table_id_to_change_logs) = version.split_change_log();
122        Some(PinnedVersion {
123            guard: Arc::new(PinnedVersionGuard::new(
124                version_id,
125                self.guard.pinned_version_manager_tx.clone(),
126            )),
127            table_change_log: Arc::new(RwLock::new(table_id_to_change_logs)),
128            version: Arc::new(local_version),
129        })
130    }
131
132    /// Create a new `PinnedVersion` with the given `LocalHummockVersion`. Referring to the usage in the `hummock_event_handler`.
133    pub fn new_with_local_version(&self, version: LocalHummockVersion) -> Option<Self> {
134        assert!(
135            version.id >= self.version.id,
136            "pinning a older version {}. Current is {}",
137            version.id,
138            self.version.id
139        );
140        if version.id == self.version.id {
141            return None;
142        }
143
144        let version_id = version.id;
145
146        Some(PinnedVersion {
147            guard: Arc::new(PinnedVersionGuard::new(
148                version_id,
149                self.guard.pinned_version_manager_tx.clone(),
150            )),
151            table_change_log: self.table_change_log.clone(),
152            version: Arc::new(version),
153        })
154    }
155
156    pub fn id(&self) -> HummockVersionId {
157        self.version.id
158    }
159
160    pub fn is_valid(&self) -> bool {
161        self.version.id != INVALID_VERSION_ID
162    }
163
164    fn levels_by_compaction_groups_id(&self, compaction_group_id: CompactionGroupId) -> &Levels {
165        self.version
166            .levels
167            .get(&compaction_group_id)
168            .unwrap_or_else(|| {
169                panic!(
170                    "levels for compaction group {} not found in version {}",
171                    compaction_group_id,
172                    self.id()
173                )
174            })
175    }
176
177    pub fn levels(&self, table_id: TableId) -> impl Iterator<Item = &Level> {
178        #[auto_enum(Iterator)]
179        match self.version.state_table_info.info().get(&table_id) {
180            Some(info) => {
181                let compaction_group_id = info.compaction_group_id;
182                let levels = self.levels_by_compaction_groups_id(compaction_group_id);
183                levels
184                    .l0
185                    .sub_levels
186                    .iter()
187                    .rev()
188                    .chain(levels.levels.iter())
189            }
190            None => empty(),
191        }
192    }
193
194    pub fn table_change_log_read_lock(
195        &self,
196    ) -> parking_lot::RwLockReadGuard<'_, HashMap<TableId, TableChangeLogCommon<SstableInfo>>> {
197        self.table_change_log.read()
198    }
199
200    pub fn table_change_log_write_lock(
201        &self,
202    ) -> parking_lot::RwLockWriteGuard<'_, HashMap<TableId, TableChangeLogCommon<SstableInfo>>>
203    {
204        self.table_change_log.write()
205    }
206}
207
208pub(crate) async fn start_pinned_version_worker(
209    mut rx: UnboundedReceiver<PinVersionAction>,
210    hummock_meta_client: Arc<dyn HummockMetaClient>,
211    max_version_pinning_duration_sec: u64,
212) {
213    let min_execute_interval = Duration::from_millis(1000);
214    let max_retry_interval = Duration::from_secs(10);
215    let get_backoff_strategy = || {
216        tokio_retry::strategy::ExponentialBackoff::from_millis(10)
217            .max_delay(max_retry_interval)
218            .map(jitter)
219    };
220    let mut retry_backoff = get_backoff_strategy();
221    let mut min_execute_interval_tick = tokio::time::interval(min_execute_interval);
222    min_execute_interval_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
223    let mut need_unpin = false;
224
225    let mut version_ids_in_use: BTreeMap<HummockVersionId, (usize, Instant)> = BTreeMap::new();
226    let max_version_pinning_duration_sec = Duration::from_secs(max_version_pinning_duration_sec);
227    // For each run in the loop, accumulate versions to unpin and call unpin RPC once.
228    loop {
229        min_execute_interval_tick.tick().await;
230        // 0. Expire versions.
231        while version_ids_in_use.len() > 1
232            && let Some(e) = version_ids_in_use.first_entry()
233        {
234            if e.get().1.elapsed() < max_version_pinning_duration_sec {
235                break;
236            }
237            need_unpin = true;
238            e.remove();
239        }
240
241        // 1. Collect new versions to unpin.
242        let mut versions_to_unpin = vec![];
243        let inst = Instant::now();
244        'collect: loop {
245            match rx.try_recv() {
246                Ok(version_action) => match version_action {
247                    PinVersionAction::Pin(version_id) => {
248                        version_ids_in_use
249                            .entry(version_id)
250                            .and_modify(|e| {
251                                e.0 += 1;
252                                e.1 = inst;
253                            })
254                            .or_insert((1, inst));
255                    }
256                    PinVersionAction::Unpin(version_id) => {
257                        versions_to_unpin.push(version_id);
258                    }
259                },
260                Err(err) => match err {
261                    TryRecvError::Empty => {
262                        break 'collect;
263                    }
264                    TryRecvError::Disconnected => {
265                        tracing::info!("Shutdown hummock unpin worker");
266                        return;
267                    }
268                },
269            }
270        }
271        if !versions_to_unpin.is_empty() {
272            need_unpin = true;
273        }
274        if !need_unpin {
275            continue;
276        }
277
278        for version in &versions_to_unpin {
279            match version_ids_in_use.get_mut(version) {
280                Some((counter, _)) => {
281                    *counter -= 1;
282                    if *counter == 0 {
283                        version_ids_in_use.remove(version);
284                    }
285                }
286                None => tracing::warn!(
287                    "version {} to unpin does not exist, may already be unpinned due to expiration",
288                    version
289                ),
290            }
291        }
292
293        match version_ids_in_use.first_entry() {
294            Some(unpin_before) => {
295                // 2. Call unpin RPC, including versions failed to unpin in previous RPC calls.
296                match hummock_meta_client
297                    .unpin_version_before(*unpin_before.key())
298                    .await
299                {
300                    Ok(_) => {
301                        versions_to_unpin.clear();
302                        need_unpin = false;
303                        retry_backoff = get_backoff_strategy();
304                    }
305                    Err(err) => {
306                        let retry_after = retry_backoff.next().unwrap_or(max_retry_interval);
307                        tracing::warn!(
308                            error = %err.as_report(),
309                            "Failed to unpin version. Will retry after about {} milliseconds",
310                            retry_after.as_millis()
311                        );
312                        tokio::time::sleep(retry_after).await;
313                    }
314                }
315            }
316            None => tracing::warn!("version_ids_in_use is empty!"),
317        }
318    }
319}