1use std::collections::{BTreeMap, HashMap};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19
20use anyhow::anyhow;
21use bytes::Bytes;
22use itertools::Itertools;
23use parking_lot::lock_api::RwLock;
24use risingwave_common::catalog::{TableId, TableOption};
25use risingwave_common::monitor::MonitoredRwLock;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
28use risingwave_hummock_sdk::{
29 CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockVersionId,
30 version_archive_dir, version_checkpoint_path,
31};
32use risingwave_meta_model::{
33 compaction_status, compaction_task, hummock_pinned_version, hummock_version_delta,
34 hummock_version_stats,
35};
36use risingwave_pb::hummock::{
37 HummockVersionStats, PbCompactTaskAssignment, PbCompactionGroupInfo,
38 SubscribeCompactionEventRequest,
39};
40use table_write_throughput_statistic::TableWriteThroughputStatisticManager;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
42use tokio::sync::{Mutex, Semaphore};
43use tonic::Streaming;
44
45use crate::MetaResult;
46use crate::hummock::CompactorManagerRef;
47use crate::hummock::compaction::CompactStatus;
48use crate::hummock::error::Result;
49use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
50use crate::hummock::manager::context::ContextInfo;
51use crate::hummock::manager::gc::{FullGcState, GcManager};
52use crate::manager::{MetaSrvEnv, MetadataManager};
53use crate::model::{ClusterId, MetadataModelError};
54use crate::rpc::metrics::MetaMetrics;
55
56mod context;
57mod gc;
58mod tests;
59mod versioning;
60pub use context::HummockVersionSafePoint;
61use versioning::*;
62pub(crate) mod checkpoint;
63mod commit_epoch;
64mod compaction;
65pub mod sequence;
66pub mod table_write_throughput_statistic;
67pub mod time_travel;
68mod timer_task;
69mod transaction;
70mod utils;
71mod worker;
72
73pub use commit_epoch::{CommitEpochInfo, NewTableFragmentInfo};
74use compaction::*;
75pub use compaction::{GroupState, GroupStateValidator};
76pub(crate) use utils::*;
77
78struct TableCommittedEpochNotifiers {
79 txs: HashMap<TableId, Vec<UnboundedSender<u64>>>,
80}
81
82impl TableCommittedEpochNotifiers {
83 fn notify_deltas(&mut self, deltas: &[HummockVersionDelta]) {
84 self.txs.retain(|table_id, txs| {
85 let mut is_dropped = false;
86 let mut committed_epoch = None;
87 for delta in deltas {
88 if delta.removed_table_ids.contains(table_id) {
89 is_dropped = true;
90 break;
91 }
92 if let Some(info) = delta.state_table_info_delta.get(table_id) {
93 committed_epoch = Some(info.committed_epoch);
94 }
95 }
96 if is_dropped {
97 false
98 } else if let Some(committed_epoch) = committed_epoch {
99 txs.retain(|tx| tx.send(committed_epoch).is_ok());
100 !txs.is_empty()
101 } else {
102 true
103 }
104 })
105 }
106}
107pub struct HummockManager {
113 pub env: MetaSrvEnv,
114
115 metadata_manager: MetadataManager,
116 compaction: MonitoredRwLock<Compaction>,
120 versioning: MonitoredRwLock<Versioning>,
121 compaction_group_manager: MonitoredRwLock<CompactionGroupManager>,
123 context_info: MonitoredRwLock<ContextInfo>,
124
125 pub metrics: Arc<MetaMetrics>,
126
127 pub compactor_manager: CompactorManagerRef,
128 event_sender: HummockManagerEventSender,
129 object_store: ObjectStoreRef,
130 version_checkpoint_path: String,
131 version_archive_dir: String,
132 pause_version_checkpoint: AtomicBool,
133 table_write_throughput_statistic_manager:
134 parking_lot::RwLock<TableWriteThroughputStatisticManager>,
135 table_committed_epoch_notifiers: parking_lot::Mutex<TableCommittedEpochNotifiers>,
136
137 compactor_streams_change_tx: UnboundedSender<(u32, Streaming<SubscribeCompactionEventRequest>)>,
141
142 pub compaction_state: CompactionState,
145 full_gc_state: Arc<FullGcState>,
146 now: Mutex<u64>,
147 inflight_time_travel_query: Semaphore,
148 gc_manager: GcManager,
149
150 table_id_to_table_option: parking_lot::RwLock<HashMap<u32, TableOption>>,
151}
152
153pub type HummockManagerRef = Arc<HummockManager>;
154
155use risingwave_object_store::object::{ObjectError, ObjectStoreRef, build_remote_object_store};
156use risingwave_pb::catalog::Table;
157
158macro_rules! start_measure_real_process_timer {
159 ($hummock_mgr:expr, $func_name:literal) => {
160 $hummock_mgr
161 .metrics
162 .hummock_manager_real_process_time
163 .with_label_values(&[$func_name])
164 .start_timer()
165 };
166}
167pub(crate) use start_measure_real_process_timer;
168
169use crate::controller::SqlMetaStore;
170use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
171use crate::hummock::manager::worker::HummockManagerEventSender;
172
173impl HummockManager {
174 pub async fn new(
175 env: MetaSrvEnv,
176 metadata_manager: MetadataManager,
177 metrics: Arc<MetaMetrics>,
178 compactor_manager: CompactorManagerRef,
179 compactor_streams_change_tx: UnboundedSender<(
180 u32,
181 Streaming<SubscribeCompactionEventRequest>,
182 )>,
183 ) -> Result<HummockManagerRef> {
184 let compaction_group_manager = CompactionGroupManager::new(&env).await?;
185 Self::new_impl(
186 env,
187 metadata_manager,
188 metrics,
189 compactor_manager,
190 compaction_group_manager,
191 compactor_streams_change_tx,
192 )
193 .await
194 }
195
196 #[cfg(any(test, feature = "test"))]
197 pub(super) async fn with_config(
198 env: MetaSrvEnv,
199 cluster_controller: crate::controller::cluster::ClusterControllerRef,
200 catalog_controller: crate::controller::catalog::CatalogControllerRef,
201 metrics: Arc<MetaMetrics>,
202 compactor_manager: CompactorManagerRef,
203 config: risingwave_pb::hummock::CompactionConfig,
204 compactor_streams_change_tx: UnboundedSender<(
205 u32,
206 Streaming<SubscribeCompactionEventRequest>,
207 )>,
208 ) -> HummockManagerRef {
209 let compaction_group_manager = CompactionGroupManager::new_with_config(&env, config)
210 .await
211 .unwrap();
212 let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
213 Self::new_impl(
214 env,
215 metadata_manager,
216 metrics,
217 compactor_manager,
218 compaction_group_manager,
219 compactor_streams_change_tx,
220 )
221 .await
222 .unwrap()
223 }
224
225 async fn new_impl(
226 env: MetaSrvEnv,
227 metadata_manager: MetadataManager,
228 metrics: Arc<MetaMetrics>,
229 compactor_manager: CompactorManagerRef,
230 compaction_group_manager: CompactionGroupManager,
231 compactor_streams_change_tx: UnboundedSender<(
232 u32,
233 Streaming<SubscribeCompactionEventRequest>,
234 )>,
235 ) -> Result<HummockManagerRef> {
236 let sys_params = env.system_params_reader().await;
237 let state_store_url = sys_params.state_store();
238 let state_store_dir: &str = sys_params.data_directory();
239 let use_new_object_prefix_strategy: bool = sys_params.use_new_object_prefix_strategy();
240 let deterministic_mode = env.opts.compaction_deterministic_test;
241 let mut object_store_config = env.opts.object_store_config.clone();
242 object_store_config.set_atomic_write_dir();
245 let object_store = Arc::new(
246 build_remote_object_store(
247 state_store_url.strip_prefix("hummock+").unwrap_or("memory"),
248 metrics.object_store_metric.clone(),
249 "Version Checkpoint",
250 Arc::new(object_store_config),
251 )
252 .await,
253 );
254 if !deterministic_mode {
258 write_exclusive_cluster_id(
259 state_store_dir,
260 env.cluster_id().clone(),
261 object_store.clone(),
262 )
263 .await?;
264
265 if let risingwave_object_store::object::ObjectStoreImpl::S3(s3) = object_store.as_ref()
267 && !env.opts.do_not_config_object_storage_lifecycle
268 {
269 let is_bucket_expiration_configured =
270 s3.inner().configure_bucket_lifecycle(state_store_dir).await;
271 if is_bucket_expiration_configured {
272 return Err(ObjectError::internal("Cluster cannot start with object expiration configured for bucket because RisingWave data will be lost when object expiration kicks in.
273 Please disable object expiration and restart the cluster.")
274 .into());
275 }
276 }
277 }
278 let version_checkpoint_path = version_checkpoint_path(state_store_dir);
279 let version_archive_dir = version_archive_dir(state_store_dir);
280 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
281 let inflight_time_travel_query = env.opts.max_inflight_time_travel_query;
282 let gc_manager = GcManager::new(
283 object_store.clone(),
284 state_store_dir,
285 use_new_object_prefix_strategy,
286 );
287
288 let max_table_statistic_expired_time = std::cmp::max(
289 env.opts.table_stat_throuput_window_seconds_for_split,
290 env.opts.table_stat_throuput_window_seconds_for_merge,
291 ) as i64;
292
293 let instance = HummockManager {
294 env,
295 versioning: MonitoredRwLock::new(
296 metrics.hummock_manager_lock_time.clone(),
297 Default::default(),
298 "hummock_manager::versioning",
299 ),
300 compaction: MonitoredRwLock::new(
301 metrics.hummock_manager_lock_time.clone(),
302 Default::default(),
303 "hummock_manager::compaction",
304 ),
305 compaction_group_manager: MonitoredRwLock::new(
306 metrics.hummock_manager_lock_time.clone(),
307 compaction_group_manager,
308 "hummock_manager::compaction_group_manager",
309 ),
310 context_info: MonitoredRwLock::new(
311 metrics.hummock_manager_lock_time.clone(),
312 Default::default(),
313 "hummock_manager::context_info",
314 ),
315 metrics,
316 metadata_manager,
317 compactor_manager,
319 event_sender: tx,
320 object_store,
321 version_checkpoint_path,
322 version_archive_dir,
323 pause_version_checkpoint: AtomicBool::new(false),
324 table_write_throughput_statistic_manager: parking_lot::RwLock::new(
325 TableWriteThroughputStatisticManager::new(max_table_statistic_expired_time),
326 ),
327 table_committed_epoch_notifiers: parking_lot::Mutex::new(
328 TableCommittedEpochNotifiers {
329 txs: Default::default(),
330 },
331 ),
332 compactor_streams_change_tx,
333 compaction_state: CompactionState::new(),
334 full_gc_state: FullGcState::new().into(),
335 now: Mutex::new(0),
336 inflight_time_travel_query: Semaphore::new(inflight_time_travel_query as usize),
337 gc_manager,
338 table_id_to_table_option: RwLock::new(HashMap::new()),
339 };
340 let instance = Arc::new(instance);
341 instance.init_time_travel_state().await?;
342 instance.start_worker(rx).await;
343 instance.load_meta_store_state().await?;
344 instance.release_invalid_contexts().await?;
345 instance.release_meta_context().await?;
347 Ok(instance)
348 }
349
350 fn meta_store_ref(&self) -> &SqlMetaStore {
351 self.env.meta_store_ref()
352 }
353
354 async fn load_meta_store_state(&self) -> Result<()> {
356 let now = self.load_now().await?;
357 *self.now.lock().await = now.unwrap_or(0);
358
359 let mut compaction_guard = self.compaction.write().await;
360 let mut versioning_guard = self.versioning.write().await;
361 let mut context_info_guard = self.context_info.write().await;
362 self.load_meta_store_state_impl(
363 &mut compaction_guard,
364 &mut versioning_guard,
365 &mut context_info_guard,
366 )
367 .await
368 }
369
370 async fn load_meta_store_state_impl(
372 &self,
373 compaction_guard: &mut Compaction,
374 versioning_guard: &mut Versioning,
375 context_info: &mut ContextInfo,
376 ) -> Result<()> {
377 use sea_orm::EntityTrait;
378 let meta_store = self.meta_store_ref();
379 let compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus> =
380 compaction_status::Entity::find()
381 .all(&meta_store.conn)
382 .await
383 .map_err(MetadataModelError::from)?
384 .into_iter()
385 .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
386 .collect();
387 if !compaction_statuses.is_empty() {
388 compaction_guard.compaction_statuses = compaction_statuses;
389 }
390
391 compaction_guard.compact_task_assignment = compaction_task::Entity::find()
392 .all(&meta_store.conn)
393 .await
394 .map_err(MetadataModelError::from)?
395 .into_iter()
396 .map(|m| {
397 (
398 m.id as HummockCompactionTaskId,
399 PbCompactTaskAssignment::from(m),
400 )
401 })
402 .collect();
403
404 let hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta> =
405 hummock_version_delta::Entity::find()
406 .all(&meta_store.conn)
407 .await
408 .map_err(MetadataModelError::from)?
409 .into_iter()
410 .map(|m| {
411 (
412 HummockVersionId::new(m.id as _),
413 HummockVersionDelta::from_persisted_protobuf(&m.into()),
414 )
415 })
416 .collect();
417
418 let checkpoint = self.try_read_checkpoint().await?;
419 let mut redo_state = if let Some(c) = checkpoint {
420 versioning_guard.checkpoint = c;
421 versioning_guard.checkpoint.version.clone()
422 } else {
423 let default_compaction_config = self
424 .compaction_group_manager
425 .read()
426 .await
427 .default_compaction_config();
428 let checkpoint_version = HummockVersion::create_init_version(default_compaction_config);
429 tracing::info!("init hummock version checkpoint");
430 versioning_guard.checkpoint = HummockVersionCheckpoint {
431 version: checkpoint_version.clone(),
432 stale_objects: Default::default(),
433 };
434 self.write_checkpoint(&versioning_guard.checkpoint).await?;
435 checkpoint_version
436 };
437 let mut applied_delta_count = 0;
438 let total_to_apply = hummock_version_deltas.range(redo_state.id + 1..).count();
439 tracing::info!(
440 total_delta = hummock_version_deltas.len(),
441 total_to_apply,
442 "Start redo Hummock version."
443 );
444 for version_delta in hummock_version_deltas
445 .range(redo_state.id + 1..)
446 .map(|(_, v)| v)
447 {
448 assert_eq!(
449 version_delta.prev_id, redo_state.id,
450 "delta prev_id {}, redo state id {}",
451 version_delta.prev_id, redo_state.id
452 );
453 redo_state.apply_version_delta(version_delta);
454 applied_delta_count += 1;
455 if applied_delta_count % 1000 == 0 {
456 tracing::info!("Redo progress {applied_delta_count}/{total_to_apply}.");
457 }
458 }
459 tracing::info!("Finish redo Hummock version.");
460 versioning_guard.version_stats = hummock_version_stats::Entity::find()
461 .one(&meta_store.conn)
462 .await
463 .map_err(MetadataModelError::from)?
464 .map(HummockVersionStats::from)
465 .unwrap_or_else(|| HummockVersionStats {
466 hummock_version_id: 0,
468 ..Default::default()
469 });
470
471 versioning_guard.current_version = redo_state;
472 versioning_guard.hummock_version_deltas = hummock_version_deltas;
473
474 context_info.pinned_versions = hummock_pinned_version::Entity::find()
475 .all(&meta_store.conn)
476 .await
477 .map_err(MetadataModelError::from)?
478 .into_iter()
479 .map(|m| (m.context_id as HummockContextId, m.into()))
480 .collect();
481
482 self.initial_compaction_group_config_after_load(
483 versioning_guard,
484 self.compaction_group_manager.write().await.deref_mut(),
485 )
486 .await?;
487
488 Ok(())
489 }
490
491 pub fn init_metadata_for_version_replay(
492 &self,
493 _table_catalogs: Vec<Table>,
494 _compaction_groups: Vec<PbCompactionGroupInfo>,
495 ) -> Result<()> {
496 unimplemented!("kv meta store is deprecated");
497 }
498
499 pub async fn replay_version_delta(
503 &self,
504 mut version_delta: HummockVersionDelta,
505 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
506 let mut versioning_guard = self.versioning.write().await;
507 version_delta.id = versioning_guard.current_version.next_version_id();
509 version_delta.prev_id = versioning_guard.current_version.id;
510 versioning_guard
511 .current_version
512 .apply_version_delta(&version_delta);
513
514 let version_new = versioning_guard.current_version.clone();
515 let compaction_group_ids = version_delta.group_deltas.keys().cloned().collect_vec();
516 Ok((version_new, compaction_group_ids))
517 }
518
519 pub async fn disable_commit_epoch(&self) -> HummockVersion {
520 let mut versioning_guard = self.versioning.write().await;
521 versioning_guard.disable_commit_epochs = true;
522 versioning_guard.current_version.clone()
523 }
524
525 pub fn metadata_manager(&self) -> &MetadataManager {
526 &self.metadata_manager
527 }
528
529 pub fn object_store_media_type(&self) -> &'static str {
530 self.object_store.media_type()
531 }
532
533 pub fn update_table_id_to_table_option(
534 &self,
535 new_table_id_to_table_option: HashMap<u32, TableOption>,
536 ) {
537 *self.table_id_to_table_option.write() = new_table_id_to_table_option;
538 }
539
540 pub fn metadata_manager_ref(&self) -> &MetadataManager {
541 &self.metadata_manager
542 }
543
544 pub async fn subscribe_table_committed_epoch(
545 &self,
546 table_id: TableId,
547 ) -> MetaResult<(u64, UnboundedReceiver<u64>)> {
548 let version = self.versioning.read().await;
549 if let Some(epoch) = version.current_version.table_committed_epoch(table_id) {
550 let (tx, rx) = unbounded_channel();
551 self.table_committed_epoch_notifiers
552 .lock()
553 .txs
554 .entry(table_id)
555 .or_default()
556 .push(tx);
557 Ok((epoch, rx))
558 } else {
559 Err(anyhow!("table {} not exist", table_id).into())
560 }
561 }
562}
563
564async fn write_exclusive_cluster_id(
565 state_store_dir: &str,
566 cluster_id: ClusterId,
567 object_store: ObjectStoreRef,
568) -> Result<()> {
569 const CLUSTER_ID_DIR: &str = "cluster_id";
570 const CLUSTER_ID_NAME: &str = "0";
571 let cluster_id_dir = format!("{}/{}/", state_store_dir, CLUSTER_ID_DIR);
572 let cluster_id_full_path = format!("{}{}", cluster_id_dir, CLUSTER_ID_NAME);
573 tracing::info!("try reading cluster_id");
574 match object_store.read(&cluster_id_full_path, ..).await {
575 Ok(stored_cluster_id) => {
576 let stored_cluster_id = String::from_utf8(stored_cluster_id.to_vec()).unwrap();
577 if cluster_id.deref() == stored_cluster_id {
578 return Ok(());
579 }
580
581 Err(ObjectError::internal(format!(
582 "Data directory is already used by another cluster with id {:?}, path {}.",
583 stored_cluster_id, cluster_id_full_path,
584 ))
585 .into())
586 }
587 Err(e) => {
588 if e.is_object_not_found_error() {
589 tracing::info!("cluster_id not found, writing cluster_id");
590 object_store
591 .upload(&cluster_id_full_path, Bytes::from(String::from(cluster_id)))
592 .await?;
593 return Ok(());
594 }
595 Err(e.into())
596 }
597 }
598}