risingwave_storage/hummock/local_version/
pinned_version.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::iter::empty;
17use std::ops::Deref;
18use std::sync::{Arc, LazyLock};
19use std::time::{Duration, Instant};
20
21use auto_enums::auto_enum;
22use risingwave_common::catalog::TableId;
23use risingwave_common::log::LogSuppressor;
24use risingwave_hummock_sdk::level::{Level, Levels};
25use risingwave_hummock_sdk::version::{HummockVersion, LocalHummockVersion};
26use risingwave_hummock_sdk::{CompactionGroupId, HummockVersionId, INVALID_VERSION_ID};
27use risingwave_rpc_client::HummockMetaClient;
28use thiserror_ext::AsReport;
29use tokio::sync::mpsc::error::TryRecvError;
30use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
31use tokio_retry::strategy::jitter;
32
33#[derive(Debug, Clone)]
34pub enum PinVersionAction {
35    Pin(HummockVersionId),
36    Unpin(HummockVersionId),
37}
38
39struct PinnedVersionGuard {
40    version_id: HummockVersionId,
41    pinned_version_manager_tx: UnboundedSender<PinVersionAction>,
42}
43
44impl PinnedVersionGuard {
45    /// Creates a new `PinnedVersionGuard` and send a pin request to `pinned_version_worker`.
46    fn new(
47        version_id: HummockVersionId,
48        pinned_version_manager_tx: UnboundedSender<PinVersionAction>,
49    ) -> Self {
50        if pinned_version_manager_tx
51            .send(PinVersionAction::Pin(version_id))
52            .is_err()
53        {
54            tracing::warn!("failed to send req pin version id{}", version_id);
55        }
56
57        Self {
58            version_id,
59            pinned_version_manager_tx,
60        }
61    }
62}
63
64impl Drop for PinnedVersionGuard {
65    fn drop(&mut self) {
66        if self
67            .pinned_version_manager_tx
68            .send(PinVersionAction::Unpin(self.version_id))
69            .is_err()
70        {
71            static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
72                LazyLock::new(|| LogSuppressor::per_second(1));
73            if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
74                tracing::warn!(
75                    suppressed_count,
76                    version_id = %self.version_id,
77                    "failed to send req unpin"
78                );
79            }
80        }
81    }
82}
83
84#[derive(Clone)]
85pub struct PinnedVersion {
86    version: Arc<LocalHummockVersion>,
87    guard: Arc<PinnedVersionGuard>,
88}
89
90impl Deref for PinnedVersion {
91    type Target = LocalHummockVersion;
92
93    fn deref(&self) -> &Self::Target {
94        &self.version
95    }
96}
97
98impl PinnedVersion {
99    pub fn new(
100        version: HummockVersion,
101        pinned_version_manager_tx: UnboundedSender<PinVersionAction>,
102    ) -> Self {
103        let version_id = version.id;
104        let local_version = LocalHummockVersion::from(version);
105        PinnedVersion {
106            guard: Arc::new(PinnedVersionGuard::new(
107                version_id,
108                pinned_version_manager_tx,
109            )),
110            version: Arc::new(local_version),
111        }
112    }
113
114    pub fn new_pin_version(&self, version: HummockVersion) -> Option<Self> {
115        assert!(
116            version.id >= self.version.id,
117            "pinning a older version {}. Current is {}",
118            version.id,
119            self.version.id
120        );
121        if version.id == self.version.id {
122            return None;
123        }
124        let version_id = version.id;
125        let local_version = LocalHummockVersion::from(version);
126        Some(PinnedVersion {
127            guard: Arc::new(PinnedVersionGuard::new(
128                version_id,
129                self.guard.pinned_version_manager_tx.clone(),
130            )),
131            version: Arc::new(local_version),
132        })
133    }
134
135    /// Create a new `PinnedVersion` with the given `LocalHummockVersion`. Referring to the usage in the `hummock_event_handler`.
136    pub fn new_with_local_version(&self, version: LocalHummockVersion) -> Option<Self> {
137        assert!(
138            version.id >= self.version.id,
139            "pinning a older version {}. Current is {}",
140            version.id,
141            self.version.id
142        );
143        if version.id == self.version.id {
144            return None;
145        }
146
147        let version_id = version.id;
148
149        Some(PinnedVersion {
150            guard: Arc::new(PinnedVersionGuard::new(
151                version_id,
152                self.guard.pinned_version_manager_tx.clone(),
153            )),
154            version: Arc::new(version),
155        })
156    }
157
158    pub fn id(&self) -> HummockVersionId {
159        self.version.id
160    }
161
162    pub fn is_valid(&self) -> bool {
163        self.version.id != INVALID_VERSION_ID
164    }
165
166    fn levels_by_compaction_groups_id(&self, compaction_group_id: CompactionGroupId) -> &Levels {
167        self.version
168            .levels
169            .get(&compaction_group_id)
170            .unwrap_or_else(|| {
171                panic!(
172                    "levels for compaction group {} not found in version {}",
173                    compaction_group_id,
174                    self.id()
175                )
176            })
177    }
178
179    pub fn levels(&self, table_id: TableId) -> impl Iterator<Item = &Level> {
180        #[auto_enum(Iterator)]
181        match self.version.state_table_info.info().get(&table_id) {
182            Some(info) => {
183                let compaction_group_id = info.compaction_group_id;
184                let levels = self.levels_by_compaction_groups_id(compaction_group_id);
185                levels
186                    .l0
187                    .sub_levels
188                    .iter()
189                    .rev()
190                    .chain(levels.levels.iter())
191            }
192            None => empty(),
193        }
194    }
195}
196
197pub(crate) async fn start_pinned_version_worker(
198    mut rx: UnboundedReceiver<PinVersionAction>,
199    hummock_meta_client: Arc<dyn HummockMetaClient>,
200    max_version_pinning_duration_sec: u64,
201) {
202    let min_execute_interval = Duration::from_millis(1000);
203    let max_retry_interval = Duration::from_secs(10);
204    let get_backoff_strategy = || {
205        tokio_retry::strategy::ExponentialBackoff::from_millis(10)
206            .max_delay(max_retry_interval)
207            .map(jitter)
208    };
209    let mut retry_backoff = get_backoff_strategy();
210    let mut min_execute_interval_tick = tokio::time::interval(min_execute_interval);
211    min_execute_interval_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
212    let mut need_unpin = false;
213
214    let mut version_ids_in_use: BTreeMap<HummockVersionId, (usize, Instant)> = BTreeMap::new();
215    let max_version_pinning_duration_sec = Duration::from_secs(max_version_pinning_duration_sec);
216    // For each run in the loop, accumulate versions to unpin and call unpin RPC once.
217    loop {
218        min_execute_interval_tick.tick().await;
219        // 0. Expire versions.
220        while version_ids_in_use.len() > 1
221            && let Some(e) = version_ids_in_use.first_entry()
222        {
223            if e.get().1.elapsed() < max_version_pinning_duration_sec {
224                break;
225            }
226            need_unpin = true;
227            e.remove();
228        }
229
230        // 1. Collect new versions to unpin.
231        let mut versions_to_unpin = vec![];
232        let inst = Instant::now();
233        'collect: loop {
234            match rx.try_recv() {
235                Ok(version_action) => match version_action {
236                    PinVersionAction::Pin(version_id) => {
237                        version_ids_in_use
238                            .entry(version_id)
239                            .and_modify(|e| {
240                                e.0 += 1;
241                                e.1 = inst;
242                            })
243                            .or_insert((1, inst));
244                    }
245                    PinVersionAction::Unpin(version_id) => {
246                        versions_to_unpin.push(version_id);
247                    }
248                },
249                Err(err) => match err {
250                    TryRecvError::Empty => {
251                        break 'collect;
252                    }
253                    TryRecvError::Disconnected => {
254                        tracing::info!("Shutdown hummock unpin worker");
255                        return;
256                    }
257                },
258            }
259        }
260        if !versions_to_unpin.is_empty() {
261            need_unpin = true;
262        }
263        if !need_unpin {
264            continue;
265        }
266
267        for version in &versions_to_unpin {
268            match version_ids_in_use.get_mut(version) {
269                Some((counter, _)) => {
270                    *counter -= 1;
271                    if *counter == 0 {
272                        version_ids_in_use.remove(version);
273                    }
274                }
275                None => tracing::warn!(
276                    "version {} to unpin does not exist, may already be unpinned due to expiration",
277                    version
278                ),
279            }
280        }
281
282        match version_ids_in_use.first_entry() {
283            Some(unpin_before) => {
284                // 2. Call unpin RPC, including versions failed to unpin in previous RPC calls.
285                match hummock_meta_client
286                    .unpin_version_before(*unpin_before.key())
287                    .await
288                {
289                    Ok(_) => {
290                        versions_to_unpin.clear();
291                        need_unpin = false;
292                        retry_backoff = get_backoff_strategy();
293                    }
294                    Err(err) => {
295                        let retry_after = retry_backoff.next().unwrap_or(max_retry_interval);
296                        tracing::warn!(
297                            error = %err.as_report(),
298                            "Failed to unpin version. Will retry after about {} milliseconds",
299                            retry_after.as_millis()
300                        );
301                        tokio::time::sleep(retry_after).await;
302                    }
303                }
304            }
305            None => tracing::warn!("version_ids_in_use is empty!"),
306        }
307    }
308}