risingwave_storage/hummock/local_version/
pinned_version.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::iter::empty;
17use std::ops::Deref;
18use std::sync::{Arc, LazyLock};
19use std::time::{Duration, Instant};
20
21use auto_enums::auto_enum;
22use parking_lot::RwLock;
23use risingwave_common::catalog::TableId;
24use risingwave_common::log::LogSuppressor;
25use risingwave_hummock_sdk::change_log::TableChangeLogCommon;
26use risingwave_hummock_sdk::level::{Level, Levels};
27use risingwave_hummock_sdk::sstable_info::SstableInfo;
28use risingwave_hummock_sdk::version::{HummockVersion, LocalHummockVersion};
29use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID};
30use risingwave_rpc_client::HummockMetaClient;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc::error::TryRecvError;
33use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
34use tokio_retry::strategy::jitter;
35
36#[derive(Debug, Clone)]
37pub enum PinVersionAction {
38    Pin(HummockVersionId),
39    Unpin(HummockVersionId),
40}
41
42struct PinnedVersionGuard {
43    version_id: HummockVersionId,
44    pinned_version_manager_tx: UnboundedSender<PinVersionAction>,
45}
46
47impl PinnedVersionGuard {
48    /// Creates a new `PinnedVersionGuard` and send a pin request to `pinned_version_worker`.
49    fn new(
50        version_id: HummockVersionId,
51        pinned_version_manager_tx: UnboundedSender<PinVersionAction>,
52    ) -> Self {
53        if pinned_version_manager_tx
54            .send(PinVersionAction::Pin(version_id))
55            .is_err()
56        {
57            tracing::warn!("failed to send req pin version id{}", version_id);
58        }
59
60        Self {
61            version_id,
62            pinned_version_manager_tx,
63        }
64    }
65}
66
67impl Drop for PinnedVersionGuard {
68    fn drop(&mut self) {
69        if self
70            .pinned_version_manager_tx
71            .send(PinVersionAction::Unpin(self.version_id))
72            .is_err()
73        {
74            static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
75                LazyLock::new(|| LogSuppressor::per_second(1));
76            if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
77                tracing::warn!(
78                    suppressed_count,
79                    version_id = %self.version_id,
80                    "failed to send req unpin"
81                );
82            }
83        }
84    }
85}
86
87#[derive(Clone)]
88pub struct PinnedVersion {
89    version: Arc<LocalHummockVersion>,
90    guard: Arc<PinnedVersionGuard>,
91    table_change_log: Arc<RwLock<HashMap<TableId, TableChangeLogCommon<SstableInfo>>>>,
92}
93
94impl Deref for PinnedVersion {
95    type Target = LocalHummockVersion;
96
97    fn deref(&self) -> &Self::Target {
98        &self.version
99    }
100}
101
102impl PinnedVersion {
103    pub fn new(
104        version: HummockVersion,
105        pinned_version_manager_tx: UnboundedSender<PinVersionAction>,
106    ) -> Self {
107        let version_id = version.id;
108        let (local_version, table_id_to_change_logs) = version.split_change_log();
109        PinnedVersion {
110            guard: Arc::new(PinnedVersionGuard::new(
111                version_id,
112                pinned_version_manager_tx,
113            )),
114            table_change_log: Arc::new(RwLock::new(table_id_to_change_logs)),
115            version: Arc::new(local_version),
116        }
117    }
118
119    pub fn new_pin_version(&self, version: HummockVersion) -> Option<Self> {
120        assert!(
121            version.id >= self.version.id,
122            "pinning a older version {}. Current is {}",
123            version.id,
124            self.version.id
125        );
126        if version.id == self.version.id {
127            return None;
128        }
129        let version_id = version.id;
130        let (local_version, table_id_to_change_logs) = version.split_change_log();
131        Some(PinnedVersion {
132            guard: Arc::new(PinnedVersionGuard::new(
133                version_id,
134                self.guard.pinned_version_manager_tx.clone(),
135            )),
136            table_change_log: Arc::new(RwLock::new(table_id_to_change_logs)),
137            version: Arc::new(local_version),
138        })
139    }
140
141    /// Create a new `PinnedVersion` with the given `LocalHummockVersion`. Referring to the usage in the `hummock_event_handler`.
142    pub fn new_with_local_version(&self, version: LocalHummockVersion) -> Option<Self> {
143        assert!(
144            version.id >= self.version.id,
145            "pinning a older version {}. Current is {}",
146            version.id,
147            self.version.id
148        );
149        if version.id == self.version.id {
150            return None;
151        }
152
153        let version_id = version.id;
154
155        Some(PinnedVersion {
156            guard: Arc::new(PinnedVersionGuard::new(
157                version_id,
158                self.guard.pinned_version_manager_tx.clone(),
159            )),
160            table_change_log: self.table_change_log.clone(),
161            version: Arc::new(version),
162        })
163    }
164
165    pub fn id(&self) -> HummockVersionId {
166        self.version.id
167    }
168
169    pub fn is_valid(&self) -> bool {
170        self.version.id != INVALID_VERSION_ID
171    }
172
173    fn levels_by_compaction_groups_id(&self, compaction_group_id: CompactionGroupId) -> &Levels {
174        self.version
175            .levels
176            .get(&compaction_group_id)
177            .unwrap_or_else(|| {
178                panic!(
179                    "levels for compaction group {} not found in version {}",
180                    compaction_group_id,
181                    self.id()
182                )
183            })
184    }
185
186    pub fn levels(&self, table_id: TableId) -> impl Iterator<Item = &Level> {
187        #[auto_enum(Iterator)]
188        match self.version.state_table_info.info().get(&table_id) {
189            Some(info) => {
190                let compaction_group_id = info.compaction_group_id;
191                let levels = self.levels_by_compaction_groups_id(compaction_group_id);
192                levels
193                    .l0
194                    .sub_levels
195                    .iter()
196                    .rev()
197                    .chain(levels.levels.iter())
198            }
199            None => empty(),
200        }
201    }
202
203    pub fn table_change_log_read_lock(
204        &self,
205    ) -> parking_lot::RwLockReadGuard<'_, HashMap<TableId, TableChangeLogCommon<SstableInfo>>> {
206        self.table_change_log.read()
207    }
208
209    pub fn table_change_log_write_lock(
210        &self,
211    ) -> parking_lot::RwLockWriteGuard<'_, HashMap<TableId, TableChangeLogCommon<SstableInfo>>>
212    {
213        self.table_change_log.write()
214    }
215}
216
217pub(crate) async fn start_pinned_version_worker(
218    mut rx: UnboundedReceiver<PinVersionAction>,
219    hummock_meta_client: Arc<dyn HummockMetaClient>,
220    max_version_pinning_duration_sec: u64,
221) {
222    let min_execute_interval = Duration::from_millis(1000);
223    let max_retry_interval = Duration::from_secs(10);
224    let get_backoff_strategy = || {
225        tokio_retry::strategy::ExponentialBackoff::from_millis(10)
226            .max_delay(max_retry_interval)
227            .map(jitter)
228    };
229    let mut retry_backoff = get_backoff_strategy();
230    let mut min_execute_interval_tick = tokio::time::interval(min_execute_interval);
231    min_execute_interval_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
232    let mut need_unpin = false;
233
234    let mut version_ids_in_use: BTreeMap<HummockVersionId, (usize, Instant)> = BTreeMap::new();
235    let max_version_pinning_duration_sec = Duration::from_secs(max_version_pinning_duration_sec);
236    // For each run in the loop, accumulate versions to unpin and call unpin RPC once.
237    loop {
238        min_execute_interval_tick.tick().await;
239        // 0. Expire versions.
240        while version_ids_in_use.len() > 1
241            && let Some(e) = version_ids_in_use.first_entry()
242        {
243            if e.get().1.elapsed() < max_version_pinning_duration_sec {
244                break;
245            }
246            need_unpin = true;
247            e.remove();
248        }
249
250        // 1. Collect new versions to unpin.
251        let mut versions_to_unpin = vec![];
252        let inst = Instant::now();
253        'collect: loop {
254            match rx.try_recv() {
255                Ok(version_action) => match version_action {
256                    PinVersionAction::Pin(version_id) => {
257                        version_ids_in_use
258                            .entry(version_id)
259                            .and_modify(|e| {
260                                e.0 += 1;
261                                e.1 = inst;
262                            })
263                            .or_insert((1, inst));
264                    }
265                    PinVersionAction::Unpin(version_id) => {
266                        versions_to_unpin.push(version_id);
267                    }
268                },
269                Err(err) => match err {
270                    TryRecvError::Empty => {
271                        break 'collect;
272                    }
273                    TryRecvError::Disconnected => {
274                        tracing::info!("Shutdown hummock unpin worker");
275                        return;
276                    }
277                },
278            }
279        }
280        if !versions_to_unpin.is_empty() {
281            need_unpin = true;
282        }
283        if !need_unpin {
284            continue;
285        }
286
287        for version in &versions_to_unpin {
288            match version_ids_in_use.get_mut(version) {
289                Some((counter, _)) => {
290                    *counter -= 1;
291                    if *counter == 0 {
292                        version_ids_in_use.remove(version);
293                    }
294                }
295                None => tracing::warn!(
296                    "version {} to unpin does not exist, may already be unpinned due to expiration",
297                    version
298                ),
299            }
300        }
301
302        match version_ids_in_use.first_entry() {
303            Some(unpin_before) => {
304                // 2. Call unpin RPC, including versions failed to unpin in previous RPC calls.
305                match hummock_meta_client
306                    .unpin_version_before(*unpin_before.key())
307                    .await
308                {
309                    Ok(_) => {
310                        versions_to_unpin.clear();
311                        need_unpin = false;
312                        retry_backoff = get_backoff_strategy();
313                    }
314                    Err(err) => {
315                        let retry_after = retry_backoff.next().unwrap_or(max_retry_interval);
316                        tracing::warn!(
317                            error = %err.as_report(),
318                            "Failed to unpin version. Will retry after about {} milliseconds",
319                            retry_after.as_millis()
320                        );
321                        tokio::time::sleep(retry_after).await;
322                    }
323                }
324            }
325            None => tracing::warn!("version_ids_in_use is empty!"),
326        }
327    }
328}