1use std::collections::{BTreeMap, HashMap};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19
20use anyhow::anyhow;
21use bytes::Bytes;
22use itertools::Itertools;
23use parking_lot::lock_api::RwLock;
24use risingwave_common::catalog::{TableId, TableOption};
25use risingwave_common::monitor::MonitoredRwLock;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
28use risingwave_hummock_sdk::{
29 CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockVersionId,
30 version_archive_dir, version_checkpoint_path,
31};
32use risingwave_meta_model::{
33 compaction_status, compaction_task, hummock_pinned_version, hummock_version_delta,
34 hummock_version_stats,
35};
36use risingwave_pb::hummock::{
37 HummockVersionStats, PbCompactTaskAssignment, PbCompactionGroupInfo,
38 SubscribeCompactionEventRequest,
39};
40use table_write_throughput_statistic::TableWriteThroughputStatisticManager;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
42use tokio::sync::{Mutex, Semaphore};
43use tonic::Streaming;
44
45use crate::MetaResult;
46use crate::hummock::CompactorManagerRef;
47use crate::hummock::compaction::CompactStatus;
48use crate::hummock::error::Result;
49use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
50use crate::hummock::manager::context::ContextInfo;
51use crate::hummock::manager::gc::{FullGcState, GcManager};
52use crate::manager::{MetaSrvEnv, MetadataManager};
53use crate::model::{ClusterId, MetadataModelError};
54use crate::rpc::metrics::MetaMetrics;
55
56mod context;
57mod gc;
58mod tests;
59mod versioning;
60pub use context::HummockVersionSafePoint;
61use versioning::*;
62pub(crate) mod checkpoint;
63mod commit_epoch;
64mod compaction;
65pub mod sequence;
66pub mod table_write_throughput_statistic;
67pub mod time_travel;
68mod timer_task;
69mod transaction;
70mod utils;
71mod worker;
72
73pub use commit_epoch::{CommitEpochInfo, NewTableFragmentInfo};
74pub use compaction::compaction_event_loop::*;
75use compaction::*;
76pub use compaction::{GroupState, GroupStateValidator};
77pub(crate) use utils::*;
78
79struct TableCommittedEpochNotifiers {
80 txs: HashMap<TableId, Vec<UnboundedSender<u64>>>,
81}
82
83impl TableCommittedEpochNotifiers {
84 fn notify_deltas(&mut self, deltas: &[HummockVersionDelta]) {
85 self.txs.retain(|table_id, txs| {
86 let mut is_dropped = false;
87 let mut committed_epoch = None;
88 for delta in deltas {
89 if delta.removed_table_ids.contains(table_id) {
90 is_dropped = true;
91 break;
92 }
93 if let Some(info) = delta.state_table_info_delta.get(table_id) {
94 committed_epoch = Some(info.committed_epoch);
95 }
96 }
97 if is_dropped {
98 false
99 } else if let Some(committed_epoch) = committed_epoch {
100 txs.retain(|tx| tx.send(committed_epoch).is_ok());
101 !txs.is_empty()
102 } else {
103 true
104 }
105 })
106 }
107}
108pub struct HummockManager {
114 pub env: MetaSrvEnv,
115
116 metadata_manager: MetadataManager,
117 compaction: MonitoredRwLock<Compaction>,
121 versioning: MonitoredRwLock<Versioning>,
122 compaction_group_manager: MonitoredRwLock<CompactionGroupManager>,
124 context_info: MonitoredRwLock<ContextInfo>,
125
126 pub metrics: Arc<MetaMetrics>,
127
128 pub compactor_manager: CompactorManagerRef,
129 pub iceberg_compactor_manager: Arc<IcebergCompactorManager>,
130 event_sender: HummockManagerEventSender,
131 object_store: ObjectStoreRef,
132 version_checkpoint_path: String,
133 version_archive_dir: String,
134 pause_version_checkpoint: AtomicBool,
135 table_write_throughput_statistic_manager:
136 parking_lot::RwLock<TableWriteThroughputStatisticManager>,
137 table_committed_epoch_notifiers: parking_lot::Mutex<TableCommittedEpochNotifiers>,
138
139 compactor_streams_change_tx:
143 UnboundedSender<(HummockContextId, Streaming<SubscribeCompactionEventRequest>)>,
144
145 pub compaction_state: CompactionState,
148 full_gc_state: Arc<FullGcState>,
149 now: Mutex<u64>,
150 inflight_time_travel_query: Semaphore,
151 gc_manager: GcManager,
152
153 table_id_to_table_option: parking_lot::RwLock<HashMap<TableId, TableOption>>,
154}
155
156pub type HummockManagerRef = Arc<HummockManager>;
157
158use risingwave_object_store::object::{ObjectError, ObjectStoreRef, build_remote_object_store};
159use risingwave_pb::catalog::Table;
160
161macro_rules! start_measure_real_process_timer {
162 ($hummock_mgr:expr, $func_name:literal) => {
163 $hummock_mgr
164 .metrics
165 .hummock_manager_real_process_time
166 .with_label_values(&[$func_name])
167 .start_timer()
168 };
169}
170pub(crate) use start_measure_real_process_timer;
171
172use super::IcebergCompactorManager;
173use crate::controller::SqlMetaStore;
174use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
175use crate::hummock::manager::worker::HummockManagerEventSender;
176
177impl HummockManager {
178 pub async fn new(
179 env: MetaSrvEnv,
180 metadata_manager: MetadataManager,
181 metrics: Arc<MetaMetrics>,
182 compactor_manager: CompactorManagerRef,
183 compactor_streams_change_tx: UnboundedSender<(
184 HummockContextId,
185 Streaming<SubscribeCompactionEventRequest>,
186 )>,
187 ) -> Result<HummockManagerRef> {
188 let compaction_group_manager = CompactionGroupManager::new(&env).await?;
189 Self::new_impl(
190 env,
191 metadata_manager,
192 metrics,
193 compactor_manager,
194 compaction_group_manager,
195 compactor_streams_change_tx,
196 )
197 .await
198 }
199
200 #[cfg(any(test, feature = "test"))]
201 pub(super) async fn with_config(
202 env: MetaSrvEnv,
203 cluster_controller: crate::controller::cluster::ClusterControllerRef,
204 catalog_controller: crate::controller::catalog::CatalogControllerRef,
205 metrics: Arc<MetaMetrics>,
206 compactor_manager: CompactorManagerRef,
207 config: risingwave_pb::hummock::CompactionConfig,
208 compactor_streams_change_tx: UnboundedSender<(
209 HummockContextId,
210 Streaming<SubscribeCompactionEventRequest>,
211 )>,
212 ) -> HummockManagerRef {
213 let compaction_group_manager = CompactionGroupManager::new_with_config(&env, config)
214 .await
215 .unwrap();
216 let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
217 Self::new_impl(
218 env,
219 metadata_manager,
220 metrics,
221 compactor_manager,
222 compaction_group_manager,
223 compactor_streams_change_tx,
224 )
225 .await
226 .unwrap()
227 }
228
229 async fn new_impl(
230 env: MetaSrvEnv,
231 metadata_manager: MetadataManager,
232 metrics: Arc<MetaMetrics>,
233 compactor_manager: CompactorManagerRef,
234 compaction_group_manager: CompactionGroupManager,
235 compactor_streams_change_tx: UnboundedSender<(
236 HummockContextId,
237 Streaming<SubscribeCompactionEventRequest>,
238 )>,
239 ) -> Result<HummockManagerRef> {
240 let sys_params = env.system_params_reader().await;
241 let state_store_url = sys_params.state_store();
242
243 let state_store_dir: &str = sys_params.data_directory();
244 let use_new_object_prefix_strategy: bool = sys_params.use_new_object_prefix_strategy();
245 let deterministic_mode = env.opts.compaction_deterministic_test;
246 let mut object_store_config = env.opts.object_store_config.clone();
247 object_store_config.set_atomic_write_dir();
250 let object_store = Arc::new(
251 build_remote_object_store(
252 state_store_url.strip_prefix("hummock+").unwrap_or("memory"),
253 metrics.object_store_metric.clone(),
254 "Version Checkpoint",
255 Arc::new(object_store_config),
256 )
257 .await,
258 );
259 if !deterministic_mode {
263 write_exclusive_cluster_id(
264 state_store_dir,
265 env.cluster_id().clone(),
266 object_store.clone(),
267 )
268 .await?;
269
270 if let risingwave_object_store::object::ObjectStoreImpl::S3(s3) = object_store.as_ref()
272 && !env.opts.do_not_config_object_storage_lifecycle
273 {
274 let is_bucket_expiration_configured =
275 s3.inner().configure_bucket_lifecycle(state_store_dir).await;
276 if is_bucket_expiration_configured {
277 return Err(ObjectError::internal("Cluster cannot start with object expiration configured for bucket because RisingWave data will be lost when object expiration kicks in.
278 Please disable object expiration and restart the cluster.")
279 .into());
280 }
281 }
282 }
283 let version_checkpoint_path = version_checkpoint_path(state_store_dir);
284 let version_archive_dir = version_archive_dir(state_store_dir);
285 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
286 let inflight_time_travel_query = env.opts.max_inflight_time_travel_query;
287 let gc_manager = GcManager::new(
288 object_store.clone(),
289 state_store_dir,
290 use_new_object_prefix_strategy,
291 );
292
293 let max_table_statistic_expired_time = std::cmp::max(
294 env.opts.table_stat_throuput_window_seconds_for_split,
295 env.opts.table_stat_throuput_window_seconds_for_merge,
296 ) as i64;
297
298 let iceberg_compactor_manager = Arc::new(IcebergCompactorManager::new());
299
300 let instance = HummockManager {
301 env,
302 versioning: MonitoredRwLock::new(
303 metrics.hummock_manager_lock_time.clone(),
304 Default::default(),
305 "hummock_manager::versioning",
306 ),
307 compaction: MonitoredRwLock::new(
308 metrics.hummock_manager_lock_time.clone(),
309 Default::default(),
310 "hummock_manager::compaction",
311 ),
312 compaction_group_manager: MonitoredRwLock::new(
313 metrics.hummock_manager_lock_time.clone(),
314 compaction_group_manager,
315 "hummock_manager::compaction_group_manager",
316 ),
317 context_info: MonitoredRwLock::new(
318 metrics.hummock_manager_lock_time.clone(),
319 Default::default(),
320 "hummock_manager::context_info",
321 ),
322 metrics,
323 metadata_manager,
324 compactor_manager,
325 iceberg_compactor_manager,
326 event_sender: tx,
327 object_store,
328 version_checkpoint_path,
329 version_archive_dir,
330 pause_version_checkpoint: AtomicBool::new(false),
331 table_write_throughput_statistic_manager: parking_lot::RwLock::new(
332 TableWriteThroughputStatisticManager::new(max_table_statistic_expired_time),
333 ),
334 table_committed_epoch_notifiers: parking_lot::Mutex::new(
335 TableCommittedEpochNotifiers {
336 txs: Default::default(),
337 },
338 ),
339 compactor_streams_change_tx,
340 compaction_state: CompactionState::new(),
341 full_gc_state: FullGcState::new().into(),
342 now: Mutex::new(0),
343 inflight_time_travel_query: Semaphore::new(inflight_time_travel_query as usize),
344 gc_manager,
345 table_id_to_table_option: RwLock::new(HashMap::new()),
346 };
347 let instance = Arc::new(instance);
348 instance.init_time_travel_state().await?;
349 instance.start_worker(rx);
350 instance.load_meta_store_state().await?;
351 instance.release_invalid_contexts().await?;
352 instance.release_meta_context().await?;
354 Ok(instance)
355 }
356
357 fn meta_store_ref(&self) -> &SqlMetaStore {
358 self.env.meta_store_ref()
359 }
360
361 async fn load_meta_store_state(&self) -> Result<()> {
363 let now = self.load_now().await?;
364 *self.now.lock().await = now.unwrap_or(0);
365
366 let mut compaction_guard = self.compaction.write().await;
367 let mut versioning_guard = self.versioning.write().await;
368 let mut context_info_guard = self.context_info.write().await;
369 self.load_meta_store_state_impl(
370 &mut compaction_guard,
371 &mut versioning_guard,
372 &mut context_info_guard,
373 )
374 .await
375 }
376
377 async fn load_meta_store_state_impl(
379 &self,
380 compaction_guard: &mut Compaction,
381 versioning_guard: &mut Versioning,
382 context_info: &mut ContextInfo,
383 ) -> Result<()> {
384 use sea_orm::EntityTrait;
385 let meta_store = self.meta_store_ref();
386 let compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus> =
387 compaction_status::Entity::find()
388 .all(&meta_store.conn)
389 .await
390 .map_err(MetadataModelError::from)?
391 .into_iter()
392 .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
393 .collect();
394 if !compaction_statuses.is_empty() {
395 compaction_guard.compaction_statuses = compaction_statuses;
396 }
397
398 compaction_guard.compact_task_assignment = compaction_task::Entity::find()
399 .all(&meta_store.conn)
400 .await
401 .map_err(MetadataModelError::from)?
402 .into_iter()
403 .map(|m| {
404 (
405 m.id as HummockCompactionTaskId,
406 PbCompactTaskAssignment::from(m),
407 )
408 })
409 .collect();
410
411 let hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta> =
412 hummock_version_delta::Entity::find()
413 .all(&meta_store.conn)
414 .await
415 .map_err(MetadataModelError::from)?
416 .into_iter()
417 .map(|m| {
418 (
419 HummockVersionId::new(m.id as _),
420 HummockVersionDelta::from_persisted_protobuf(&m.into()),
421 )
422 })
423 .collect();
424
425 let checkpoint = self.try_read_checkpoint().await?;
426 let mut redo_state = if let Some(c) = checkpoint {
427 versioning_guard.checkpoint = c;
428 versioning_guard.checkpoint.version.clone()
429 } else {
430 let default_compaction_config = self
431 .compaction_group_manager
432 .read()
433 .await
434 .default_compaction_config();
435 let checkpoint_version = HummockVersion::create_init_version(default_compaction_config);
436 tracing::info!("init hummock version checkpoint");
437 versioning_guard.checkpoint = HummockVersionCheckpoint {
438 version: checkpoint_version.clone(),
439 stale_objects: Default::default(),
440 };
441 self.write_checkpoint(&versioning_guard.checkpoint).await?;
442 checkpoint_version
443 };
444 let mut applied_delta_count = 0;
445 let total_to_apply = hummock_version_deltas.range(redo_state.id + 1..).count();
446 tracing::info!(
447 total_delta = hummock_version_deltas.len(),
448 total_to_apply,
449 "Start redo Hummock version."
450 );
451 for version_delta in hummock_version_deltas
452 .range(redo_state.id + 1..)
453 .map(|(_, v)| v)
454 {
455 assert_eq!(
456 version_delta.prev_id, redo_state.id,
457 "delta prev_id {}, redo state id {}",
458 version_delta.prev_id, redo_state.id
459 );
460 redo_state.apply_version_delta(version_delta);
461 applied_delta_count += 1;
462 if applied_delta_count % 1000 == 0 {
463 tracing::info!("Redo progress {applied_delta_count}/{total_to_apply}.");
464 }
465 }
466 tracing::info!("Finish redo Hummock version.");
467 versioning_guard.version_stats = hummock_version_stats::Entity::find()
468 .one(&meta_store.conn)
469 .await
470 .map_err(MetadataModelError::from)?
471 .map(HummockVersionStats::from)
472 .unwrap_or_else(|| HummockVersionStats {
473 hummock_version_id: 0,
475 ..Default::default()
476 });
477
478 versioning_guard.current_version = redo_state;
479 versioning_guard.hummock_version_deltas = hummock_version_deltas;
480
481 context_info.pinned_versions = hummock_pinned_version::Entity::find()
482 .all(&meta_store.conn)
483 .await
484 .map_err(MetadataModelError::from)?
485 .into_iter()
486 .map(|m| (m.context_id as HummockContextId, m.into()))
487 .collect();
488
489 self.initial_compaction_group_config_after_load(
490 versioning_guard,
491 self.compaction_group_manager.write().await.deref_mut(),
492 )
493 .await?;
494
495 Ok(())
496 }
497
498 pub fn init_metadata_for_version_replay(
499 &self,
500 _table_catalogs: Vec<Table>,
501 _compaction_groups: Vec<PbCompactionGroupInfo>,
502 ) -> Result<()> {
503 unimplemented!("kv meta store is deprecated");
504 }
505
506 pub async fn replay_version_delta(
510 &self,
511 mut version_delta: HummockVersionDelta,
512 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
513 let mut versioning_guard = self.versioning.write().await;
514 version_delta.id = versioning_guard.current_version.next_version_id();
516 version_delta.prev_id = versioning_guard.current_version.id;
517 versioning_guard
518 .current_version
519 .apply_version_delta(&version_delta);
520
521 let version_new = versioning_guard.current_version.clone();
522 let compaction_group_ids = version_delta.group_deltas.keys().cloned().collect_vec();
523 Ok((version_new, compaction_group_ids))
524 }
525
526 pub async fn disable_commit_epoch(&self) -> HummockVersion {
527 let mut versioning_guard = self.versioning.write().await;
528 versioning_guard.disable_commit_epochs = true;
529 versioning_guard.current_version.clone()
530 }
531
532 pub fn metadata_manager(&self) -> &MetadataManager {
533 &self.metadata_manager
534 }
535
536 pub fn object_store_media_type(&self) -> &'static str {
537 self.object_store.media_type()
538 }
539
540 pub fn update_table_id_to_table_option(
541 &self,
542 new_table_id_to_table_option: HashMap<TableId, TableOption>,
543 ) {
544 *self.table_id_to_table_option.write() = new_table_id_to_table_option;
545 }
546
547 pub fn metadata_manager_ref(&self) -> &MetadataManager {
548 &self.metadata_manager
549 }
550
551 pub async fn subscribe_table_committed_epoch(
552 &self,
553 table_id: TableId,
554 ) -> MetaResult<(u64, UnboundedReceiver<u64>)> {
555 let version = self.versioning.read().await;
556 if let Some(epoch) = version.current_version.table_committed_epoch(table_id) {
557 let (tx, rx) = unbounded_channel();
558 self.table_committed_epoch_notifiers
559 .lock()
560 .txs
561 .entry(table_id)
562 .or_default()
563 .push(tx);
564 Ok((epoch, rx))
565 } else {
566 Err(anyhow!("table {} not exist", table_id).into())
567 }
568 }
569}
570
571async fn write_exclusive_cluster_id(
572 state_store_dir: &str,
573 cluster_id: ClusterId,
574 object_store: ObjectStoreRef,
575) -> Result<()> {
576 const CLUSTER_ID_DIR: &str = "cluster_id";
577 const CLUSTER_ID_NAME: &str = "0";
578 let cluster_id_dir = format!("{}/{}/", state_store_dir, CLUSTER_ID_DIR);
579 let cluster_id_full_path = format!("{}{}", cluster_id_dir, CLUSTER_ID_NAME);
580 tracing::info!("try reading cluster_id");
581 match object_store.read(&cluster_id_full_path, ..).await {
582 Ok(stored_cluster_id) => {
583 let stored_cluster_id = String::from_utf8(stored_cluster_id.to_vec()).unwrap();
584 if cluster_id.deref() == stored_cluster_id {
585 return Ok(());
586 }
587
588 Err(ObjectError::internal(format!(
589 "Data directory is already used by another cluster with id {:?}, path {}.",
590 stored_cluster_id, cluster_id_full_path,
591 ))
592 .into())
593 }
594 Err(e) => {
595 if e.is_object_not_found_error() {
596 tracing::info!("cluster_id not found, writing cluster_id");
597 object_store
598 .upload(&cluster_id_full_path, Bytes::from(String::from(cluster_id)))
599 .await?;
600 return Ok(());
601 }
602 Err(e.into())
603 }
604 }
605}