1use std::collections::{BTreeMap, HashMap};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19
20use anyhow::anyhow;
21use bytes::Bytes;
22use itertools::Itertools;
23use parking_lot::lock_api::RwLock;
24use risingwave_common::catalog::{TableId, TableOption};
25use risingwave_common::monitor::MonitoredRwLock;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
28use risingwave_hummock_sdk::{
29 CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockVersionId,
30 version_archive_dir, version_checkpoint_path,
31};
32use risingwave_meta_model::{
33 compaction_status, compaction_task, hummock_pinned_version, hummock_version_delta,
34 hummock_version_stats,
35};
36use risingwave_pb::hummock::{
37 HummockVersionStats, PbCompactTaskAssignment, PbCompactionGroupInfo,
38 SubscribeCompactionEventRequest,
39};
40use table_write_throughput_statistic::TableWriteThroughputStatisticManager;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
42use tokio::sync::{Mutex, Semaphore};
43use tonic::Streaming;
44
45use crate::MetaResult;
46use crate::hummock::CompactorManagerRef;
47use crate::hummock::compaction::CompactStatus;
48use crate::hummock::error::Result;
49use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
50use crate::hummock::manager::context::ContextInfo;
51use crate::hummock::manager::gc::{FullGcState, GcManager};
52use crate::manager::{MetaSrvEnv, MetadataManager};
53use crate::model::{ClusterId, MetadataModelError};
54use crate::rpc::metrics::MetaMetrics;
55
56mod context;
57mod gc;
58mod tests;
59mod versioning;
60pub use context::HummockVersionSafePoint;
61use versioning::*;
62pub(crate) mod checkpoint;
63mod commit_epoch;
64mod compaction;
65pub mod sequence;
66pub mod table_write_throughput_statistic;
67pub mod time_travel;
68mod timer_task;
69mod transaction;
70mod utils;
71mod worker;
72
73pub use commit_epoch::{CommitEpochInfo, NewTableFragmentInfo};
74pub use compaction::compaction_event_loop::*;
75use compaction::*;
76pub use compaction::{GroupState, GroupStateValidator};
77pub(crate) use utils::*;
78
79struct TableCommittedEpochNotifiers {
80 txs: HashMap<TableId, Vec<UnboundedSender<u64>>>,
81}
82
83impl TableCommittedEpochNotifiers {
84 fn notify_deltas(&mut self, deltas: &[HummockVersionDelta]) {
85 self.txs.retain(|table_id, txs| {
86 let mut is_dropped = false;
87 let mut committed_epoch = None;
88 for delta in deltas {
89 if delta.removed_table_ids.contains(table_id) {
90 is_dropped = true;
91 break;
92 }
93 if let Some(info) = delta.state_table_info_delta.get(table_id) {
94 committed_epoch = Some(info.committed_epoch);
95 }
96 }
97 if is_dropped {
98 false
99 } else if let Some(committed_epoch) = committed_epoch {
100 txs.retain(|tx| tx.send(committed_epoch).is_ok());
101 !txs.is_empty()
102 } else {
103 true
104 }
105 })
106 }
107}
108pub struct HummockManager {
114 pub env: MetaSrvEnv,
115
116 metadata_manager: MetadataManager,
117 compaction: MonitoredRwLock<Compaction>,
121 versioning: MonitoredRwLock<Versioning>,
122 compaction_group_manager: MonitoredRwLock<CompactionGroupManager>,
124 context_info: MonitoredRwLock<ContextInfo>,
125
126 pub metrics: Arc<MetaMetrics>,
127
128 pub compactor_manager: CompactorManagerRef,
129 pub iceberg_compactor_manager: Arc<IcebergCompactorManager>,
130 event_sender: HummockManagerEventSender,
131 object_store: ObjectStoreRef,
132 version_checkpoint_path: String,
133 version_archive_dir: String,
134 pause_version_checkpoint: AtomicBool,
135 table_write_throughput_statistic_manager:
136 parking_lot::RwLock<TableWriteThroughputStatisticManager>,
137 table_committed_epoch_notifiers: parking_lot::Mutex<TableCommittedEpochNotifiers>,
138
139 compactor_streams_change_tx: UnboundedSender<(u32, Streaming<SubscribeCompactionEventRequest>)>,
143
144 pub compaction_state: CompactionState,
147 full_gc_state: Arc<FullGcState>,
148 now: Mutex<u64>,
149 inflight_time_travel_query: Semaphore,
150 gc_manager: GcManager,
151
152 table_id_to_table_option: parking_lot::RwLock<HashMap<u32, TableOption>>,
153}
154
155pub type HummockManagerRef = Arc<HummockManager>;
156
157use risingwave_object_store::object::{ObjectError, ObjectStoreRef, build_remote_object_store};
158use risingwave_pb::catalog::Table;
159
160macro_rules! start_measure_real_process_timer {
161 ($hummock_mgr:expr, $func_name:literal) => {
162 $hummock_mgr
163 .metrics
164 .hummock_manager_real_process_time
165 .with_label_values(&[$func_name])
166 .start_timer()
167 };
168}
169pub(crate) use start_measure_real_process_timer;
170
171use super::IcebergCompactorManager;
172use crate::controller::SqlMetaStore;
173use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
174use crate::hummock::manager::worker::HummockManagerEventSender;
175
176impl HummockManager {
177 pub async fn new(
178 env: MetaSrvEnv,
179 metadata_manager: MetadataManager,
180 metrics: Arc<MetaMetrics>,
181 compactor_manager: CompactorManagerRef,
182 compactor_streams_change_tx: UnboundedSender<(
183 u32,
184 Streaming<SubscribeCompactionEventRequest>,
185 )>,
186 ) -> Result<HummockManagerRef> {
187 let compaction_group_manager = CompactionGroupManager::new(&env).await?;
188 Self::new_impl(
189 env,
190 metadata_manager,
191 metrics,
192 compactor_manager,
193 compaction_group_manager,
194 compactor_streams_change_tx,
195 )
196 .await
197 }
198
199 #[cfg(any(test, feature = "test"))]
200 pub(super) async fn with_config(
201 env: MetaSrvEnv,
202 cluster_controller: crate::controller::cluster::ClusterControllerRef,
203 catalog_controller: crate::controller::catalog::CatalogControllerRef,
204 metrics: Arc<MetaMetrics>,
205 compactor_manager: CompactorManagerRef,
206 config: risingwave_pb::hummock::CompactionConfig,
207 compactor_streams_change_tx: UnboundedSender<(
208 u32,
209 Streaming<SubscribeCompactionEventRequest>,
210 )>,
211 ) -> HummockManagerRef {
212 let compaction_group_manager = CompactionGroupManager::new_with_config(&env, config)
213 .await
214 .unwrap();
215 let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
216 Self::new_impl(
217 env,
218 metadata_manager,
219 metrics,
220 compactor_manager,
221 compaction_group_manager,
222 compactor_streams_change_tx,
223 )
224 .await
225 .unwrap()
226 }
227
228 async fn new_impl(
229 env: MetaSrvEnv,
230 metadata_manager: MetadataManager,
231 metrics: Arc<MetaMetrics>,
232 compactor_manager: CompactorManagerRef,
233 compaction_group_manager: CompactionGroupManager,
234 compactor_streams_change_tx: UnboundedSender<(
235 u32,
236 Streaming<SubscribeCompactionEventRequest>,
237 )>,
238 ) -> Result<HummockManagerRef> {
239 let sys_params = env.system_params_reader().await;
240 let state_store_url = sys_params.state_store();
241 let state_store_dir: &str = sys_params.data_directory();
242 let use_new_object_prefix_strategy: bool = sys_params.use_new_object_prefix_strategy();
243 let deterministic_mode = env.opts.compaction_deterministic_test;
244 let mut object_store_config = env.opts.object_store_config.clone();
245 object_store_config.set_atomic_write_dir();
248 let object_store = Arc::new(
249 build_remote_object_store(
250 state_store_url.strip_prefix("hummock+").unwrap_or("memory"),
251 metrics.object_store_metric.clone(),
252 "Version Checkpoint",
253 Arc::new(object_store_config),
254 )
255 .await,
256 );
257 if !deterministic_mode {
261 write_exclusive_cluster_id(
262 state_store_dir,
263 env.cluster_id().clone(),
264 object_store.clone(),
265 )
266 .await?;
267
268 if let risingwave_object_store::object::ObjectStoreImpl::S3(s3) = object_store.as_ref()
270 && !env.opts.do_not_config_object_storage_lifecycle
271 {
272 let is_bucket_expiration_configured =
273 s3.inner().configure_bucket_lifecycle(state_store_dir).await;
274 if is_bucket_expiration_configured {
275 return Err(ObjectError::internal("Cluster cannot start with object expiration configured for bucket because RisingWave data will be lost when object expiration kicks in.
276 Please disable object expiration and restart the cluster.")
277 .into());
278 }
279 }
280 }
281 let version_checkpoint_path = version_checkpoint_path(state_store_dir);
282 let version_archive_dir = version_archive_dir(state_store_dir);
283 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
284 let inflight_time_travel_query = env.opts.max_inflight_time_travel_query;
285 let gc_manager = GcManager::new(
286 object_store.clone(),
287 state_store_dir,
288 use_new_object_prefix_strategy,
289 );
290
291 let max_table_statistic_expired_time = std::cmp::max(
292 env.opts.table_stat_throuput_window_seconds_for_split,
293 env.opts.table_stat_throuput_window_seconds_for_merge,
294 ) as i64;
295
296 let iceberg_compactor_manager = Arc::new(IcebergCompactorManager::new());
297
298 let instance = HummockManager {
299 env,
300 versioning: MonitoredRwLock::new(
301 metrics.hummock_manager_lock_time.clone(),
302 Default::default(),
303 "hummock_manager::versioning",
304 ),
305 compaction: MonitoredRwLock::new(
306 metrics.hummock_manager_lock_time.clone(),
307 Default::default(),
308 "hummock_manager::compaction",
309 ),
310 compaction_group_manager: MonitoredRwLock::new(
311 metrics.hummock_manager_lock_time.clone(),
312 compaction_group_manager,
313 "hummock_manager::compaction_group_manager",
314 ),
315 context_info: MonitoredRwLock::new(
316 metrics.hummock_manager_lock_time.clone(),
317 Default::default(),
318 "hummock_manager::context_info",
319 ),
320 metrics,
321 metadata_manager,
322 compactor_manager,
323 iceberg_compactor_manager,
324 event_sender: tx,
325 object_store,
326 version_checkpoint_path,
327 version_archive_dir,
328 pause_version_checkpoint: AtomicBool::new(false),
329 table_write_throughput_statistic_manager: parking_lot::RwLock::new(
330 TableWriteThroughputStatisticManager::new(max_table_statistic_expired_time),
331 ),
332 table_committed_epoch_notifiers: parking_lot::Mutex::new(
333 TableCommittedEpochNotifiers {
334 txs: Default::default(),
335 },
336 ),
337 compactor_streams_change_tx,
338 compaction_state: CompactionState::new(),
339 full_gc_state: FullGcState::new().into(),
340 now: Mutex::new(0),
341 inflight_time_travel_query: Semaphore::new(inflight_time_travel_query as usize),
342 gc_manager,
343 table_id_to_table_option: RwLock::new(HashMap::new()),
344 };
345 let instance = Arc::new(instance);
346 instance.init_time_travel_state().await?;
347 instance.start_worker(rx).await;
348 instance.load_meta_store_state().await?;
349 instance.release_invalid_contexts().await?;
350 instance.release_meta_context().await?;
352 Ok(instance)
353 }
354
355 fn meta_store_ref(&self) -> &SqlMetaStore {
356 self.env.meta_store_ref()
357 }
358
359 async fn load_meta_store_state(&self) -> Result<()> {
361 let now = self.load_now().await?;
362 *self.now.lock().await = now.unwrap_or(0);
363
364 let mut compaction_guard = self.compaction.write().await;
365 let mut versioning_guard = self.versioning.write().await;
366 let mut context_info_guard = self.context_info.write().await;
367 self.load_meta_store_state_impl(
368 &mut compaction_guard,
369 &mut versioning_guard,
370 &mut context_info_guard,
371 )
372 .await
373 }
374
375 async fn load_meta_store_state_impl(
377 &self,
378 compaction_guard: &mut Compaction,
379 versioning_guard: &mut Versioning,
380 context_info: &mut ContextInfo,
381 ) -> Result<()> {
382 use sea_orm::EntityTrait;
383 let meta_store = self.meta_store_ref();
384 let compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus> =
385 compaction_status::Entity::find()
386 .all(&meta_store.conn)
387 .await
388 .map_err(MetadataModelError::from)?
389 .into_iter()
390 .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
391 .collect();
392 if !compaction_statuses.is_empty() {
393 compaction_guard.compaction_statuses = compaction_statuses;
394 }
395
396 compaction_guard.compact_task_assignment = compaction_task::Entity::find()
397 .all(&meta_store.conn)
398 .await
399 .map_err(MetadataModelError::from)?
400 .into_iter()
401 .map(|m| {
402 (
403 m.id as HummockCompactionTaskId,
404 PbCompactTaskAssignment::from(m),
405 )
406 })
407 .collect();
408
409 let hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta> =
410 hummock_version_delta::Entity::find()
411 .all(&meta_store.conn)
412 .await
413 .map_err(MetadataModelError::from)?
414 .into_iter()
415 .map(|m| {
416 (
417 HummockVersionId::new(m.id as _),
418 HummockVersionDelta::from_persisted_protobuf(&m.into()),
419 )
420 })
421 .collect();
422
423 let checkpoint = self.try_read_checkpoint().await?;
424 let mut redo_state = if let Some(c) = checkpoint {
425 versioning_guard.checkpoint = c;
426 versioning_guard.checkpoint.version.clone()
427 } else {
428 let default_compaction_config = self
429 .compaction_group_manager
430 .read()
431 .await
432 .default_compaction_config();
433 let checkpoint_version = HummockVersion::create_init_version(default_compaction_config);
434 tracing::info!("init hummock version checkpoint");
435 versioning_guard.checkpoint = HummockVersionCheckpoint {
436 version: checkpoint_version.clone(),
437 stale_objects: Default::default(),
438 };
439 self.write_checkpoint(&versioning_guard.checkpoint).await?;
440 checkpoint_version
441 };
442 let mut applied_delta_count = 0;
443 let total_to_apply = hummock_version_deltas.range(redo_state.id + 1..).count();
444 tracing::info!(
445 total_delta = hummock_version_deltas.len(),
446 total_to_apply,
447 "Start redo Hummock version."
448 );
449 for version_delta in hummock_version_deltas
450 .range(redo_state.id + 1..)
451 .map(|(_, v)| v)
452 {
453 assert_eq!(
454 version_delta.prev_id, redo_state.id,
455 "delta prev_id {}, redo state id {}",
456 version_delta.prev_id, redo_state.id
457 );
458 redo_state.apply_version_delta(version_delta);
459 applied_delta_count += 1;
460 if applied_delta_count % 1000 == 0 {
461 tracing::info!("Redo progress {applied_delta_count}/{total_to_apply}.");
462 }
463 }
464 tracing::info!("Finish redo Hummock version.");
465 versioning_guard.version_stats = hummock_version_stats::Entity::find()
466 .one(&meta_store.conn)
467 .await
468 .map_err(MetadataModelError::from)?
469 .map(HummockVersionStats::from)
470 .unwrap_or_else(|| HummockVersionStats {
471 hummock_version_id: 0,
473 ..Default::default()
474 });
475
476 versioning_guard.current_version = redo_state;
477 versioning_guard.hummock_version_deltas = hummock_version_deltas;
478
479 context_info.pinned_versions = hummock_pinned_version::Entity::find()
480 .all(&meta_store.conn)
481 .await
482 .map_err(MetadataModelError::from)?
483 .into_iter()
484 .map(|m| (m.context_id as HummockContextId, m.into()))
485 .collect();
486
487 self.initial_compaction_group_config_after_load(
488 versioning_guard,
489 self.compaction_group_manager.write().await.deref_mut(),
490 )
491 .await?;
492
493 Ok(())
494 }
495
496 pub fn init_metadata_for_version_replay(
497 &self,
498 _table_catalogs: Vec<Table>,
499 _compaction_groups: Vec<PbCompactionGroupInfo>,
500 ) -> Result<()> {
501 unimplemented!("kv meta store is deprecated");
502 }
503
504 pub async fn replay_version_delta(
508 &self,
509 mut version_delta: HummockVersionDelta,
510 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
511 let mut versioning_guard = self.versioning.write().await;
512 version_delta.id = versioning_guard.current_version.next_version_id();
514 version_delta.prev_id = versioning_guard.current_version.id;
515 versioning_guard
516 .current_version
517 .apply_version_delta(&version_delta);
518
519 let version_new = versioning_guard.current_version.clone();
520 let compaction_group_ids = version_delta.group_deltas.keys().cloned().collect_vec();
521 Ok((version_new, compaction_group_ids))
522 }
523
524 pub async fn disable_commit_epoch(&self) -> HummockVersion {
525 let mut versioning_guard = self.versioning.write().await;
526 versioning_guard.disable_commit_epochs = true;
527 versioning_guard.current_version.clone()
528 }
529
530 pub fn metadata_manager(&self) -> &MetadataManager {
531 &self.metadata_manager
532 }
533
534 pub fn object_store_media_type(&self) -> &'static str {
535 self.object_store.media_type()
536 }
537
538 pub fn update_table_id_to_table_option(
539 &self,
540 new_table_id_to_table_option: HashMap<u32, TableOption>,
541 ) {
542 *self.table_id_to_table_option.write() = new_table_id_to_table_option;
543 }
544
545 pub fn metadata_manager_ref(&self) -> &MetadataManager {
546 &self.metadata_manager
547 }
548
549 pub async fn subscribe_table_committed_epoch(
550 &self,
551 table_id: TableId,
552 ) -> MetaResult<(u64, UnboundedReceiver<u64>)> {
553 let version = self.versioning.read().await;
554 if let Some(epoch) = version.current_version.table_committed_epoch(table_id) {
555 let (tx, rx) = unbounded_channel();
556 self.table_committed_epoch_notifiers
557 .lock()
558 .txs
559 .entry(table_id)
560 .or_default()
561 .push(tx);
562 Ok((epoch, rx))
563 } else {
564 Err(anyhow!("table {} not exist", table_id).into())
565 }
566 }
567}
568
569async fn write_exclusive_cluster_id(
570 state_store_dir: &str,
571 cluster_id: ClusterId,
572 object_store: ObjectStoreRef,
573) -> Result<()> {
574 const CLUSTER_ID_DIR: &str = "cluster_id";
575 const CLUSTER_ID_NAME: &str = "0";
576 let cluster_id_dir = format!("{}/{}/", state_store_dir, CLUSTER_ID_DIR);
577 let cluster_id_full_path = format!("{}{}", cluster_id_dir, CLUSTER_ID_NAME);
578 tracing::info!("try reading cluster_id");
579 match object_store.read(&cluster_id_full_path, ..).await {
580 Ok(stored_cluster_id) => {
581 let stored_cluster_id = String::from_utf8(stored_cluster_id.to_vec()).unwrap();
582 if cluster_id.deref() == stored_cluster_id {
583 return Ok(());
584 }
585
586 Err(ObjectError::internal(format!(
587 "Data directory is already used by another cluster with id {:?}, path {}.",
588 stored_cluster_id, cluster_id_full_path,
589 ))
590 .into())
591 }
592 Err(e) => {
593 if e.is_object_not_found_error() {
594 tracing::info!("cluster_id not found, writing cluster_id");
595 object_store
596 .upload(&cluster_id_full_path, Bytes::from(String::from(cluster_id)))
597 .await?;
598 return Ok(());
599 }
600 Err(e.into())
601 }
602 }
603}