1use std::collections::{BTreeMap, HashMap};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19
20use anyhow::anyhow;
21use bytes::Bytes;
22use itertools::Itertools;
23use parking_lot::lock_api::RwLock;
24use risingwave_common::catalog::{TableId, TableOption};
25use risingwave_common::monitor::MonitoredRwLock;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
28use risingwave_hummock_sdk::{
29 CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockVersionId,
30 version_archive_dir, version_checkpoint_path,
31};
32use risingwave_meta_model::{
33 compaction_status, compaction_task, hummock_pinned_version, hummock_version_delta,
34 hummock_version_stats,
35};
36use risingwave_pb::hummock::compact_task::TaskStatus;
37use risingwave_pb::hummock::{
38 HummockVersionStats, PbCompactTaskAssignment, PbCompactionGroupInfo,
39 SubscribeCompactionEventRequest,
40};
41use table_write_throughput_statistic::TableWriteThroughputStatisticManager;
42use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
43use tokio::sync::{Mutex, Semaphore, oneshot};
44use tonic::Streaming;
45
46use crate::MetaResult;
47use crate::hummock::CompactorManagerRef;
48use crate::hummock::compaction::CompactStatus;
49use crate::hummock::error::Result;
50use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
51use crate::hummock::manager::context::ContextInfo;
52use crate::hummock::manager::gc::{FullGcState, GcManager};
53use crate::manager::{MetaSrvEnv, MetadataManager};
54use crate::model::{ClusterId, MetadataModelError};
55use crate::rpc::metrics::MetaMetrics;
56
57mod context;
58mod gc;
59mod tests;
60mod versioning;
61pub use context::HummockVersionSafePoint;
62use versioning::*;
63pub(crate) mod checkpoint;
64mod commit_epoch;
65mod compaction;
66pub mod sequence;
67pub mod table_write_throughput_statistic;
68pub mod time_travel;
69mod timer_task;
70mod transaction;
71mod utils;
72mod worker;
73
74pub use commit_epoch::{CommitEpochInfo, NewTableFragmentInfo};
75pub use compaction::compaction_event_loop::*;
76use compaction::*;
77pub use compaction::{GroupState, GroupStateValidator, ManualCompactionTriggerResult};
78pub(crate) use utils::*;
79
80struct TableCommittedEpochNotifiers {
81 txs: HashMap<TableId, Vec<UnboundedSender<u64>>>,
82}
83
84impl TableCommittedEpochNotifiers {
85 fn notify_deltas(&mut self, deltas: &[HummockVersionDelta]) {
86 self.txs.retain(|table_id, txs| {
87 let mut is_dropped = false;
88 let mut committed_epoch = None;
89 for delta in deltas {
90 if delta.removed_table_ids.contains(table_id) {
91 is_dropped = true;
92 break;
93 }
94 if let Some(info) = delta.state_table_info_delta.get(table_id) {
95 committed_epoch = Some(info.committed_epoch);
96 }
97 }
98 if is_dropped {
99 false
100 } else if let Some(committed_epoch) = committed_epoch {
101 txs.retain(|tx| tx.send(committed_epoch).is_ok());
102 !txs.is_empty()
103 } else {
104 true
105 }
106 })
107 }
108}
109
110#[derive(Clone, Debug)]
111struct CompactionTaskReportResult {
112 task_id: HummockCompactionTaskId,
113 #[allow(dead_code)]
114 task_status: TaskStatus,
115 reported: bool,
116}
117
118struct CompactionTaskReportNotifiers {
119 txs: HashMap<HummockCompactionTaskId, Vec<oneshot::Sender<CompactionTaskReportResult>>>,
120}
121
122impl CompactionTaskReportNotifiers {
123 fn register(
124 &mut self,
125 task_id: HummockCompactionTaskId,
126 tx: oneshot::Sender<CompactionTaskReportResult>,
127 ) {
128 self.txs.entry(task_id).or_default().push(tx);
129 }
130
131 fn remove(&mut self, task_id: HummockCompactionTaskId) {
132 self.txs.remove(&task_id);
133 }
134
135 fn notify(&mut self, result: CompactionTaskReportResult) {
136 if let Some(txs) = self.txs.remove(&result.task_id) {
137 for tx in txs {
138 let _ = tx.send(result.clone());
139 }
140 }
141 }
142}
143pub struct HummockManager {
149 pub env: MetaSrvEnv,
150
151 metadata_manager: MetadataManager,
152 compaction: MonitoredRwLock<Compaction>,
156 versioning: MonitoredRwLock<Versioning>,
157 compaction_group_manager: MonitoredRwLock<CompactionGroupManager>,
159 context_info: MonitoredRwLock<ContextInfo>,
160
161 pub metrics: Arc<MetaMetrics>,
162
163 pub compactor_manager: CompactorManagerRef,
164 pub iceberg_compactor_manager: Arc<IcebergCompactorManager>,
165 event_sender: HummockManagerEventSender,
166 object_store: ObjectStoreRef,
167 version_checkpoint_path: String,
168 version_archive_dir: String,
169 pause_version_checkpoint: AtomicBool,
170 table_write_throughput_statistic_manager:
171 parking_lot::RwLock<TableWriteThroughputStatisticManager>,
172 table_committed_epoch_notifiers: parking_lot::Mutex<TableCommittedEpochNotifiers>,
173 compaction_task_report_notifiers: parking_lot::Mutex<CompactionTaskReportNotifiers>,
174
175 compactor_streams_change_tx:
179 UnboundedSender<(HummockContextId, Streaming<SubscribeCompactionEventRequest>)>,
180
181 pub compaction_state: CompactionState,
184 full_gc_state: Arc<FullGcState>,
185 now: Mutex<u64>,
186 inflight_time_travel_query: Semaphore,
187 gc_manager: GcManager,
188
189 table_id_to_table_option: parking_lot::RwLock<HashMap<TableId, TableOption>>,
190}
191
192pub type HummockManagerRef = Arc<HummockManager>;
193
194use risingwave_object_store::object::{ObjectError, ObjectStoreRef, build_remote_object_store};
195use risingwave_pb::catalog::Table;
196
197macro_rules! start_measure_real_process_timer {
198 ($hummock_mgr:expr, $func_name:literal) => {
199 $hummock_mgr
200 .metrics
201 .hummock_manager_real_process_time
202 .with_label_values(&[$func_name])
203 .start_timer()
204 };
205}
206pub(crate) use start_measure_real_process_timer;
207
208use super::IcebergCompactorManager;
209use crate::controller::SqlMetaStore;
210use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
211use crate::hummock::manager::worker::HummockManagerEventSender;
212
213impl HummockManager {
214 pub async fn new(
215 env: MetaSrvEnv,
216 metadata_manager: MetadataManager,
217 metrics: Arc<MetaMetrics>,
218 compactor_manager: CompactorManagerRef,
219 compactor_streams_change_tx: UnboundedSender<(
220 HummockContextId,
221 Streaming<SubscribeCompactionEventRequest>,
222 )>,
223 ) -> Result<HummockManagerRef> {
224 let compaction_group_manager = CompactionGroupManager::new(&env).await?;
225 Self::new_impl(
226 env,
227 metadata_manager,
228 metrics,
229 compactor_manager,
230 compaction_group_manager,
231 compactor_streams_change_tx,
232 )
233 .await
234 }
235
236 #[cfg(any(test, feature = "test"))]
237 pub(super) async fn with_config(
238 env: MetaSrvEnv,
239 cluster_controller: crate::controller::cluster::ClusterControllerRef,
240 catalog_controller: crate::controller::catalog::CatalogControllerRef,
241 metrics: Arc<MetaMetrics>,
242 compactor_manager: CompactorManagerRef,
243 config: risingwave_pb::hummock::CompactionConfig,
244 compactor_streams_change_tx: UnboundedSender<(
245 HummockContextId,
246 Streaming<SubscribeCompactionEventRequest>,
247 )>,
248 ) -> HummockManagerRef {
249 let compaction_group_manager = CompactionGroupManager::new_with_config(&env, config)
250 .await
251 .unwrap();
252 let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
253 Self::new_impl(
254 env,
255 metadata_manager,
256 metrics,
257 compactor_manager,
258 compaction_group_manager,
259 compactor_streams_change_tx,
260 )
261 .await
262 .unwrap()
263 }
264
265 async fn new_impl(
266 env: MetaSrvEnv,
267 metadata_manager: MetadataManager,
268 metrics: Arc<MetaMetrics>,
269 compactor_manager: CompactorManagerRef,
270 compaction_group_manager: CompactionGroupManager,
271 compactor_streams_change_tx: UnboundedSender<(
272 HummockContextId,
273 Streaming<SubscribeCompactionEventRequest>,
274 )>,
275 ) -> Result<HummockManagerRef> {
276 let sys_params = env.system_params_reader().await;
277 let state_store_url = sys_params.state_store();
278
279 let state_store_dir: &str = sys_params.data_directory();
280 let use_new_object_prefix_strategy: bool = sys_params.use_new_object_prefix_strategy();
281 let deterministic_mode = env.opts.compaction_deterministic_test;
282 let mut object_store_config = env.opts.object_store_config.clone();
283 object_store_config.set_atomic_write_dir();
286 let object_store = Arc::new(
287 build_remote_object_store(
288 state_store_url.strip_prefix("hummock+").unwrap_or("memory"),
289 metrics.object_store_metric.clone(),
290 "Version Checkpoint",
291 Arc::new(object_store_config),
292 )
293 .await,
294 );
295 if !deterministic_mode {
299 write_exclusive_cluster_id(
300 state_store_dir,
301 env.cluster_id().clone(),
302 object_store.clone(),
303 )
304 .await?;
305
306 if let risingwave_object_store::object::ObjectStoreImpl::S3(s3) = object_store.as_ref()
308 && !env.opts.do_not_config_object_storage_lifecycle
309 {
310 let is_bucket_expiration_configured =
311 s3.inner().configure_bucket_lifecycle(state_store_dir).await;
312 if is_bucket_expiration_configured {
313 return Err(ObjectError::internal("Cluster cannot start with object expiration configured for bucket because RisingWave data will be lost when object expiration kicks in.
314 Please disable object expiration and restart the cluster.")
315 .into());
316 }
317 }
318 }
319 let version_checkpoint_path = version_checkpoint_path(state_store_dir);
320 let version_archive_dir = version_archive_dir(state_store_dir);
321 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
322 let inflight_time_travel_query = env.opts.max_inflight_time_travel_query;
323 let gc_manager = GcManager::new(
324 object_store.clone(),
325 state_store_dir,
326 use_new_object_prefix_strategy,
327 );
328
329 let max_table_statistic_expired_time = std::cmp::max(
330 env.opts.table_stat_throuput_window_seconds_for_split,
331 env.opts.table_stat_throuput_window_seconds_for_merge,
332 ) as i64;
333
334 let iceberg_compactor_manager = Arc::new(IcebergCompactorManager::new());
335
336 let instance = HummockManager {
337 env,
338 versioning: MonitoredRwLock::new(
339 metrics.hummock_manager_lock_time.clone(),
340 Default::default(),
341 "hummock_manager::versioning",
342 ),
343 compaction: MonitoredRwLock::new(
344 metrics.hummock_manager_lock_time.clone(),
345 Default::default(),
346 "hummock_manager::compaction",
347 ),
348 compaction_group_manager: MonitoredRwLock::new(
349 metrics.hummock_manager_lock_time.clone(),
350 compaction_group_manager,
351 "hummock_manager::compaction_group_manager",
352 ),
353 context_info: MonitoredRwLock::new(
354 metrics.hummock_manager_lock_time.clone(),
355 Default::default(),
356 "hummock_manager::context_info",
357 ),
358 metrics,
359 metadata_manager,
360 compactor_manager,
361 iceberg_compactor_manager,
362 event_sender: tx,
363 object_store,
364 version_checkpoint_path,
365 version_archive_dir,
366 pause_version_checkpoint: AtomicBool::new(false),
367 table_write_throughput_statistic_manager: parking_lot::RwLock::new(
368 TableWriteThroughputStatisticManager::new(max_table_statistic_expired_time),
369 ),
370 table_committed_epoch_notifiers: parking_lot::Mutex::new(
371 TableCommittedEpochNotifiers {
372 txs: Default::default(),
373 },
374 ),
375 compaction_task_report_notifiers: parking_lot::Mutex::new(
376 CompactionTaskReportNotifiers {
377 txs: Default::default(),
378 },
379 ),
380 compactor_streams_change_tx,
381 compaction_state: CompactionState::new(),
382 full_gc_state: FullGcState::new().into(),
383 now: Mutex::new(0),
384 inflight_time_travel_query: Semaphore::new(inflight_time_travel_query as usize),
385 gc_manager,
386 table_id_to_table_option: RwLock::new(HashMap::new()),
387 };
388 let instance = Arc::new(instance);
389 instance.init_time_travel_state().await?;
390 instance.start_worker(rx);
391 instance.load_meta_store_state().await?;
392 instance.release_invalid_contexts().await?;
393 instance.release_meta_context().await?;
395 Ok(instance)
396 }
397
398 fn meta_store_ref(&self) -> &SqlMetaStore {
399 self.env.meta_store_ref()
400 }
401
402 async fn load_meta_store_state(&self) -> Result<()> {
404 let now = self.load_now().await?;
405 *self.now.lock().await = now.unwrap_or(0);
406
407 let mut compaction_guard = self.compaction.write().await;
408 let mut versioning_guard = self.versioning.write().await;
409 let mut context_info_guard = self.context_info.write().await;
410 self.load_meta_store_state_impl(
411 &mut compaction_guard,
412 &mut versioning_guard,
413 &mut context_info_guard,
414 )
415 .await
416 }
417
418 async fn load_meta_store_state_impl(
420 &self,
421 compaction_guard: &mut Compaction,
422 versioning_guard: &mut Versioning,
423 context_info: &mut ContextInfo,
424 ) -> Result<()> {
425 use sea_orm::EntityTrait;
426 let meta_store = self.meta_store_ref();
427 let compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus> =
428 compaction_status::Entity::find()
429 .all(&meta_store.conn)
430 .await
431 .map_err(MetadataModelError::from)?
432 .into_iter()
433 .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
434 .collect();
435 if !compaction_statuses.is_empty() {
436 compaction_guard.compaction_statuses = compaction_statuses;
437 }
438
439 compaction_guard.compact_task_assignment = compaction_task::Entity::find()
440 .all(&meta_store.conn)
441 .await
442 .map_err(MetadataModelError::from)?
443 .into_iter()
444 .map(|m| {
445 (
446 m.id as HummockCompactionTaskId,
447 PbCompactTaskAssignment::from(m),
448 )
449 })
450 .collect();
451
452 let hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta> =
453 hummock_version_delta::Entity::find()
454 .all(&meta_store.conn)
455 .await
456 .map_err(MetadataModelError::from)?
457 .into_iter()
458 .map(|m| {
459 (
460 m.id,
461 HummockVersionDelta::from_persisted_protobuf_owned(m.into()),
462 )
463 })
464 .collect();
465
466 let checkpoint = self.try_read_checkpoint().await?;
467 let mut redo_state = if let Some(c) = checkpoint {
468 versioning_guard.checkpoint = c;
469 versioning_guard.checkpoint.version.clone()
470 } else {
471 let default_compaction_config = self
472 .compaction_group_manager
473 .read()
474 .await
475 .default_compaction_config();
476 let checkpoint_version = HummockVersion::create_init_version(default_compaction_config);
477 tracing::info!("init hummock version checkpoint");
478 versioning_guard.checkpoint = HummockVersionCheckpoint {
479 version: checkpoint_version.clone(),
480 stale_objects: Default::default(),
481 };
482 self.write_checkpoint(&versioning_guard.checkpoint).await?;
483 checkpoint_version
484 };
485 let mut applied_delta_count = 0;
486 let total_to_apply = hummock_version_deltas.range(redo_state.id + 1..).count();
487 tracing::info!(
488 total_delta = hummock_version_deltas.len(),
489 total_to_apply,
490 "Start redo Hummock version."
491 );
492 for version_delta in hummock_version_deltas
493 .range(redo_state.id + 1..)
494 .map(|(_, v)| v)
495 {
496 assert_eq!(
497 version_delta.prev_id, redo_state.id,
498 "delta prev_id {}, redo state id {}",
499 version_delta.prev_id, redo_state.id
500 );
501 redo_state.apply_version_delta(version_delta);
502 applied_delta_count += 1;
503 if applied_delta_count % 1000 == 0 {
504 tracing::info!("Redo progress {applied_delta_count}/{total_to_apply}.");
505 }
506 }
507 tracing::info!("Finish redo Hummock version.");
508 versioning_guard.version_stats = hummock_version_stats::Entity::find()
509 .one(&meta_store.conn)
510 .await
511 .map_err(MetadataModelError::from)?
512 .map(HummockVersionStats::from)
513 .unwrap_or_else(|| HummockVersionStats {
514 hummock_version_id: 0.into(),
516 ..Default::default()
517 });
518
519 versioning_guard.current_version = redo_state;
520 versioning_guard.hummock_version_deltas = hummock_version_deltas;
521
522 context_info.pinned_versions = hummock_pinned_version::Entity::find()
523 .all(&meta_store.conn)
524 .await
525 .map_err(MetadataModelError::from)?
526 .into_iter()
527 .map(|m| (m.context_id as HummockContextId, m.into()))
528 .collect();
529
530 self.initial_compaction_group_config_after_load(
531 versioning_guard,
532 self.compaction_group_manager.write().await.deref_mut(),
533 )
534 .await?;
535
536 Ok(())
537 }
538
539 pub fn init_metadata_for_version_replay(
540 &self,
541 _table_catalogs: Vec<Table>,
542 _compaction_groups: Vec<PbCompactionGroupInfo>,
543 ) -> Result<()> {
544 unimplemented!("kv meta store is deprecated");
545 }
546
547 pub async fn replay_version_delta(
551 &self,
552 mut version_delta: HummockVersionDelta,
553 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
554 let mut versioning_guard = self.versioning.write().await;
555 version_delta.id = versioning_guard.current_version.next_version_id();
557 version_delta.prev_id = versioning_guard.current_version.id;
558 versioning_guard
559 .current_version
560 .apply_version_delta(&version_delta);
561
562 let version_new = versioning_guard.current_version.clone();
563 let compaction_group_ids = version_delta.group_deltas.keys().cloned().collect_vec();
564 Ok((version_new, compaction_group_ids))
565 }
566
567 pub async fn disable_commit_epoch(&self) -> HummockVersion {
568 let mut versioning_guard = self.versioning.write().await;
569 versioning_guard.disable_commit_epochs = true;
570 versioning_guard.current_version.clone()
571 }
572
573 pub fn metadata_manager(&self) -> &MetadataManager {
574 &self.metadata_manager
575 }
576
577 pub fn object_store_media_type(&self) -> &'static str {
578 self.object_store.media_type()
579 }
580
581 pub fn update_table_id_to_table_option(
582 &self,
583 new_table_id_to_table_option: HashMap<TableId, TableOption>,
584 ) {
585 *self.table_id_to_table_option.write() = new_table_id_to_table_option;
586 }
587
588 pub fn metadata_manager_ref(&self) -> &MetadataManager {
589 &self.metadata_manager
590 }
591
592 pub async fn subscribe_table_committed_epoch(
593 &self,
594 table_id: TableId,
595 ) -> MetaResult<(u64, UnboundedReceiver<u64>)> {
596 let version = self.versioning.read().await;
597 if let Some(epoch) = version.current_version.table_committed_epoch(table_id) {
598 let (tx, rx) = unbounded_channel();
599 self.table_committed_epoch_notifiers
600 .lock()
601 .txs
602 .entry(table_id)
603 .or_default()
604 .push(tx);
605 Ok((epoch, rx))
606 } else {
607 Err(anyhow!("table {} not exist", table_id).into())
608 }
609 }
610}
611
612async fn write_exclusive_cluster_id(
613 state_store_dir: &str,
614 cluster_id: ClusterId,
615 object_store: ObjectStoreRef,
616) -> Result<()> {
617 const CLUSTER_ID_DIR: &str = "cluster_id";
618 const CLUSTER_ID_NAME: &str = "0";
619 let cluster_id_dir = format!("{}/{}/", state_store_dir, CLUSTER_ID_DIR);
620 let cluster_id_full_path = format!("{}{}", cluster_id_dir, CLUSTER_ID_NAME);
621 tracing::info!("try reading cluster_id");
622 match object_store.read(&cluster_id_full_path, ..).await {
623 Ok(stored_cluster_id) => {
624 let stored_cluster_id = String::from_utf8(stored_cluster_id.to_vec()).unwrap();
625 if cluster_id.deref() == stored_cluster_id {
626 return Ok(());
627 }
628
629 Err(ObjectError::internal(format!(
630 "Data directory is already used by another cluster with id {:?}, path {}.",
631 stored_cluster_id, cluster_id_full_path,
632 ))
633 .into())
634 }
635 Err(e) => {
636 if e.is_object_not_found_error() {
637 tracing::info!("cluster_id not found, writing cluster_id");
638 object_store
639 .upload(&cluster_id_full_path, Bytes::from(String::from(cluster_id)))
640 .await?;
641 return Ok(());
642 }
643 Err(e.into())
644 }
645 }
646}