risingwave_meta/hummock/manager/
context.rs1use std::collections::{BTreeMap, HashMap, HashSet};
16
17use fail::fail_point;
18use futures::{StreamExt, stream};
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_hummock_sdk::version::HummockVersion;
22use risingwave_hummock_sdk::{
23 HummockContextId, HummockSstableObjectId, HummockVersionId, INVALID_VERSION_ID,
24 LocalSstableInfo,
25};
26use risingwave_meta_model::hummock_gc_history;
27use risingwave_pb::hummock::{HummockPinnedVersion, ValidationTask};
28use sea_orm::{DatabaseConnection, EntityTrait};
29
30use crate::controller::SqlMetaStore;
31use crate::hummock::HummockManager;
32use crate::hummock::error::{Error, Result};
33use crate::hummock::manager::worker::{HummockManagerEvent, HummockManagerEventSender};
34use crate::hummock::manager::{commit_multi_var, start_measure_real_process_timer};
35use crate::hummock::metrics_utils::trigger_pin_unpin_version_state;
36use crate::manager::{META_NODE_ID, MetadataManager};
37use crate::model::BTreeMapTransaction;
38use crate::rpc::metrics::MetaMetrics;
39
40pub struct HummockVersionSafePoint {
43 pub id: HummockVersionId,
44 event_sender: HummockManagerEventSender,
45}
46
47impl Drop for HummockVersionSafePoint {
48 fn drop(&mut self) {
49 if self
50 .event_sender
51 .send(HummockManagerEvent::DropSafePoint(self.id))
52 .is_err()
53 {
54 tracing::debug!("failed to drop hummock version safe point {}", self.id);
55 }
56 }
57}
58
59#[derive(Default)]
60pub(super) struct ContextInfo {
61 pub pinned_versions: BTreeMap<HummockContextId, HummockPinnedVersion>,
62 pub version_safe_points: Vec<HummockVersionId>,
64}
65
66impl ContextInfo {
67 async fn release_contexts(
71 &mut self,
72 context_ids: impl AsRef<[HummockContextId]>,
73 meta_store_ref: SqlMetaStore,
74 ) -> Result<()> {
75 fail_point!("release_contexts_metastore_err", |_| Err(Error::MetaStore(
76 anyhow::anyhow!("failpoint metastore error")
77 )));
78 fail_point!("release_contexts_internal_err", |_| Err(Error::Internal(
79 anyhow::anyhow!("failpoint internal error")
80 )));
81
82 let mut pinned_versions = BTreeMapTransaction::new(&mut self.pinned_versions);
83 for context_id in context_ids.as_ref() {
84 pinned_versions.remove(*context_id);
85 }
86 commit_multi_var!(meta_store_ref, pinned_versions)?;
87
88 Ok(())
89 }
90}
91
92impl HummockManager {
93 pub async fn release_contexts(
94 &self,
95 context_ids: impl AsRef<[HummockContextId]>,
96 ) -> Result<()> {
97 let mut context_info = self.context_info.write().await;
98 context_info
99 .release_contexts(context_ids, self.env.meta_store())
100 .await?;
101 #[cfg(test)]
102 {
103 drop(context_info);
104 self.check_state_consistency().await;
105 }
106 Ok(())
107 }
108
109 pub async fn check_context(&self, context_id: HummockContextId) -> Result<bool> {
111 self.context_info
112 .read()
113 .await
114 .check_context(context_id, &self.metadata_manager)
115 .await
116 }
117
118 async fn check_context_with_meta_node(
119 &self,
120 context_id: HummockContextId,
121 context_info: &ContextInfo,
122 ) -> Result<()> {
123 if context_id == META_NODE_ID {
124 } else if !context_info
126 .check_context(context_id, &self.metadata_manager)
127 .await?
128 {
129 return Err(Error::InvalidContext(context_id));
131 }
132 Ok(())
133 }
134
135 #[cfg(any(test, feature = "test"))]
136 pub async fn get_min_pinned_version_id(&self) -> HummockVersionId {
137 self.context_info.read().await.min_pinned_version_id()
138 }
139}
140
141impl ContextInfo {
142 pub(super) async fn check_context(
146 &self,
147 context_id: HummockContextId,
148 metadata_manager: &MetadataManager,
149 ) -> Result<bool> {
150 Ok(metadata_manager
151 .get_worker_by_id(context_id as _)
152 .await
153 .map_err(|err| Error::MetaStore(err.into()))?
154 .is_some())
155 }
156}
157
158impl HummockManager {
159 pub(super) async fn release_invalid_contexts(&self) -> Result<Vec<HummockContextId>> {
161 let (active_context_ids, mut context_info) = {
162 let compaction_guard = self.compaction.read().await;
163 let context_info = self.context_info.write().await;
164 let _timer = start_measure_real_process_timer!(self, "release_invalid_contexts");
165 let mut active_context_ids = HashSet::new();
166 active_context_ids.extend(
167 compaction_guard
168 .compact_task_assignment
169 .values()
170 .map(|c| c.context_id),
171 );
172 active_context_ids.extend(context_info.pinned_versions.keys());
173 (active_context_ids, context_info)
174 };
175
176 let mut invalid_context_ids = vec![];
177 for active_context_id in &active_context_ids {
178 if !context_info
179 .check_context(*active_context_id, &self.metadata_manager)
180 .await?
181 {
182 invalid_context_ids.push(*active_context_id);
183 }
184 }
185
186 context_info
187 .release_contexts(&invalid_context_ids, self.env.meta_store())
188 .await?;
189
190 Ok(invalid_context_ids)
191 }
192
193 pub async fn commit_epoch_sanity_check(
194 &self,
195 tables_to_commit: &HashMap<TableId, u64>,
196 sstables: &[LocalSstableInfo],
197 sst_to_context: &HashMap<HummockSstableObjectId, HummockContextId>,
198 current_version: &HummockVersion,
199 ) -> Result<()> {
200 use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
201
202 for (sst_id, context_id) in sst_to_context {
203 #[cfg(test)]
204 {
205 if *context_id == crate::manager::META_NODE_ID {
206 continue;
207 }
208 }
209 if !self
210 .context_info
211 .read()
212 .await
213 .check_context(*context_id, &self.metadata_manager)
214 .await?
215 {
216 return Err(Error::InvalidSst(*sst_id));
217 }
218 }
219
220 for (table_id, committed_epoch) in tables_to_commit {
222 if let Some(info) = current_version.state_table_info.info().get(table_id) {
223 if *committed_epoch <= info.committed_epoch {
224 return Err(anyhow::anyhow!(
225 "table {} Epoch {} <= committed_epoch {}",
226 table_id,
227 committed_epoch,
228 info.committed_epoch,
229 )
230 .into());
231 }
232 }
233 }
234
235 if !sstables.is_empty() {
237 let now = self.now().await?;
239 check_sst_retention(
240 now,
241 self.env.opts.min_sst_retention_time_sec,
242 sstables
243 .iter()
244 .map(|s| (s.sst_info.object_id, s.created_at)),
245 )?;
246 if self.env.opts.gc_history_retention_time_sec != 0 {
247 let ids = sstables.iter().map(|s| s.sst_info.object_id).collect_vec();
248 check_gc_history(&self.meta_store_ref().conn, ids).await?;
249 }
250 }
251
252 async {
253 if !self.env.opts.enable_committed_sst_sanity_check {
254 return;
255 }
256 if sstables.is_empty() {
257 return;
258 }
259 let compactor = match self.compactor_manager.next_compactor() {
260 None => {
261 tracing::warn!("Skip committed SST sanity check due to no available worker");
262 return;
263 }
264 Some(compactor) => compactor,
265 };
266 let sst_infos = sstables
267 .iter()
268 .map(|LocalSstableInfo { sst_info, .. }| sst_info.clone())
269 .collect_vec();
270 if compactor
271 .send_event(ResponseEvent::ValidationTask(ValidationTask {
272 sst_infos: sst_infos.into_iter().map(|sst| sst.into()).collect_vec(),
273 sst_id_to_worker_id: sst_to_context
274 .iter()
275 .map(|(object_id, worker_id)| (object_id.inner(), *worker_id))
276 .collect(),
277 }))
278 .is_err()
279 {
280 tracing::warn!("Skip committed SST sanity check due to send failure");
281 }
282 }
283 .await;
284 Ok(())
285 }
286
287 pub async fn release_meta_context(&self) -> Result<()> {
288 self.release_contexts([META_NODE_ID]).await
289 }
290
291 pub(crate) async fn report_compaction_sanity_check(
292 &self,
293 object_timestamps: &HashMap<HummockSstableObjectId, u64>,
294 ) -> Result<()> {
295 if object_timestamps.is_empty() {
297 return Ok(());
298 }
299 let now = self.now().await?;
300 check_sst_retention(
301 now,
302 self.env.opts.min_sst_retention_time_sec,
303 object_timestamps.iter().map(|(k, v)| (*k, *v)),
304 )?;
305 if self.env.opts.gc_history_retention_time_sec != 0 {
306 let ids = object_timestamps.iter().map(|(id, _)| *id).collect_vec();
307 check_gc_history(&self.meta_store_ref().conn, ids).await?;
308 }
309 Ok(())
310 }
311}
312
313fn check_sst_retention(
314 now: u64,
315 retention_sec: u64,
316 sst_infos: impl Iterator<Item = (HummockSstableObjectId, u64)>,
317) -> Result<()> {
318 let sst_retention_watermark = now.saturating_sub(retention_sec);
319 for (object_id, created_at) in sst_infos {
320 if created_at < sst_retention_watermark {
321 return Err(anyhow::anyhow!("object {object_id} is rejected from being committed since it's below watermark: object timestamp {created_at}, meta node timestamp {now}, retention_sec {retention_sec}, watermark {sst_retention_watermark}").into());
322 }
323 }
324 Ok(())
325}
326
327async fn check_gc_history(
328 db: &DatabaseConnection,
329 object_ids: impl IntoIterator<Item = HummockSstableObjectId>,
331) -> Result<()> {
332 let futures = object_ids.into_iter().map(|id| async move {
333 let id: risingwave_meta_model::HummockSstableObjectId = id.inner().try_into().unwrap();
334 hummock_gc_history::Entity::find_by_id(id)
335 .one(db)
336 .await
337 .map_err(Error::from)
338 });
339 let res: Vec<_> = stream::iter(futures).buffer_unordered(10).collect().await;
340 let res: Result<Vec<_>> = res.into_iter().collect();
341 let mut expired_object_ids = res?.into_iter().flatten().peekable();
342 if expired_object_ids.peek().is_none() {
343 return Ok(());
344 }
345 let expired_object_ids: Vec<_> = expired_object_ids.collect();
346 tracing::error!(
347 ?expired_object_ids,
348 "new SSTs are rejected because they have already been GCed"
349 );
350 Err(Error::InvalidSst(
351 (expired_object_ids[0].object_id as u64).into(),
352 ))
353}
354
355impl HummockManager {
357 pub async fn pin_version(&self, context_id: HummockContextId) -> Result<HummockVersion> {
360 let versioning = self.versioning.read().await;
361 let mut context_info = self.context_info.write().await;
362 self.check_context_with_meta_node(context_id, &context_info)
363 .await?;
364 let _timer = start_measure_real_process_timer!(self, "pin_version");
365 let mut pinned_versions = BTreeMapTransaction::new(&mut context_info.pinned_versions);
366 let mut context_pinned_version = pinned_versions.new_entry_txn_or_default(
367 context_id,
368 HummockPinnedVersion {
369 context_id,
370 min_pinned_id: INVALID_VERSION_ID.to_u64(),
371 },
372 );
373 let version_id = versioning.current_version.id;
374 let ret = versioning.current_version.clone();
375 if HummockVersionId::new(context_pinned_version.min_pinned_id) == INVALID_VERSION_ID
376 || HummockVersionId::new(context_pinned_version.min_pinned_id) > version_id
377 {
378 context_pinned_version.min_pinned_id = version_id.to_u64();
379 commit_multi_var!(self.meta_store_ref(), context_pinned_version)?;
380 trigger_pin_unpin_version_state(&self.metrics, &context_info.pinned_versions);
381 }
382
383 #[cfg(test)]
384 {
385 drop(context_info);
386 drop(versioning);
387 self.check_state_consistency().await;
388 }
389
390 Ok(ret)
391 }
392
393 pub async fn unpin_version_before(
397 &self,
398 context_id: HummockContextId,
399 unpin_before: HummockVersionId,
400 ) -> Result<()> {
401 let mut context_info = self.context_info.write().await;
402 self.check_context_with_meta_node(context_id, &context_info)
403 .await?;
404 let _timer = start_measure_real_process_timer!(self, "unpin_version_before");
405 let mut pinned_versions = BTreeMapTransaction::new(&mut context_info.pinned_versions);
406 let mut context_pinned_version = pinned_versions.new_entry_txn_or_default(
407 context_id,
408 HummockPinnedVersion {
409 context_id,
410 min_pinned_id: 0,
411 },
412 );
413 assert!(
414 context_pinned_version.min_pinned_id <= unpin_before.to_u64(),
415 "val must be monotonically non-decreasing. old = {}, new = {}.",
416 context_pinned_version.min_pinned_id,
417 unpin_before
418 );
419 context_pinned_version.min_pinned_id = unpin_before.to_u64();
420 commit_multi_var!(self.meta_store_ref(), context_pinned_version)?;
421 trigger_pin_unpin_version_state(&self.metrics, &context_info.pinned_versions);
422
423 #[cfg(test)]
424 {
425 drop(context_info);
426 self.check_state_consistency().await;
427 }
428
429 Ok(())
430 }
431}
432
433impl HummockManager {
435 pub async fn register_safe_point(&self) -> HummockVersionSafePoint {
436 let versioning = self.versioning.read().await;
437 let mut wl = self.context_info.write().await;
438 let safe_point = HummockVersionSafePoint {
439 id: versioning.current_version.id,
440 event_sender: self.event_sender.clone(),
441 };
442 wl.version_safe_points.push(safe_point.id);
443 trigger_safepoint_stat(&self.metrics, &wl.version_safe_points);
444 safe_point
445 }
446
447 pub async fn unregister_safe_point(&self, safe_point: HummockVersionId) {
448 let mut wl = self.context_info.write().await;
449 let version_safe_points = &mut wl.version_safe_points;
450 if let Some(pos) = version_safe_points.iter().position(|sp| *sp == safe_point) {
451 version_safe_points.remove(pos);
452 }
453 trigger_safepoint_stat(&self.metrics, &wl.version_safe_points);
454 }
455}
456
457fn trigger_safepoint_stat(metrics: &MetaMetrics, safepoints: &[HummockVersionId]) {
458 if let Some(sp) = safepoints.iter().min() {
459 metrics.min_safepoint_version_id.set(sp.to_u64() as _);
460 } else {
461 metrics
462 .min_safepoint_version_id
463 .set(HummockVersionId::MAX.to_u64() as _);
464 }
465}