1use std::collections::{BTreeMap, HashMap};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19
20use anyhow::anyhow;
21use bytes::Bytes;
22use itertools::Itertools;
23use parking_lot::lock_api::RwLock;
24use risingwave_common::catalog::{TableId, TableOption};
25use risingwave_common::monitor::MonitoredRwLock;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_hummock_sdk::change_log::TableChangeLog;
28use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
29use risingwave_hummock_sdk::{
30 CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockVersionId,
31 version_archive_dir, version_checkpoint_path,
32};
33use risingwave_meta_model::{
34 compaction_status, compaction_task, hummock_pinned_version, hummock_version_delta,
35 hummock_version_stats,
36};
37use risingwave_pb::hummock::compact_task::TaskStatus;
38use risingwave_pb::hummock::{
39 HummockVersionStats, PbCompactTaskAssignment, PbCompactionGroupInfo,
40 SubscribeCompactionEventRequest,
41};
42use table_write_throughput_statistic::TableWriteThroughputStatisticManager;
43use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
44use tokio::sync::{Mutex, Semaphore, oneshot};
45use tonic::Streaming;
46
47use crate::MetaResult;
48use crate::hummock::CompactorManagerRef;
49use crate::hummock::compaction::CompactStatus;
50use crate::hummock::error::Result;
51use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
52use crate::hummock::manager::context::ContextInfo;
53use crate::hummock::manager::gc::{FullGcState, GcManager};
54use crate::hummock::manager::sequence::PrefetchedSequence;
55use crate::hummock::model::ext::to_table_change_log;
56use crate::manager::{MetaSrvEnv, MetadataManager};
57use crate::model::{ClusterId, MetadataModelError};
58use crate::rpc::metrics::MetaMetrics;
59
60mod context;
61mod gc;
62mod tests;
63mod versioning;
64pub use context::HummockVersionSafePoint;
65use versioning::*;
66pub(crate) mod checkpoint;
67mod commit_epoch;
68mod compaction;
69pub mod sequence;
70pub mod table_write_throughput_statistic;
71pub mod time_travel;
72mod timer_task;
73mod transaction;
74mod utils;
75mod worker;
76
77pub use commit_epoch::{CommitEpochInfo, NewTableFragmentInfo};
78pub use compaction::compaction_event_loop::*;
79use compaction::*;
80pub use compaction::{GroupState, GroupStateValidator, ManualCompactionTriggerResult};
81pub(crate) use utils::*;
82
83struct TableCommittedEpochNotifiers {
84 txs: HashMap<TableId, Vec<UnboundedSender<u64>>>,
85}
86
87impl TableCommittedEpochNotifiers {
88 fn notify_deltas(&mut self, deltas: &[HummockVersionDelta]) {
89 self.txs.retain(|table_id, txs| {
90 let mut is_dropped = false;
91 let mut committed_epoch = None;
92 for delta in deltas {
93 if delta.removed_table_ids.contains(table_id) {
94 is_dropped = true;
95 break;
96 }
97 if let Some(info) = delta.state_table_info_delta.get(table_id) {
98 committed_epoch = Some(info.committed_epoch);
99 }
100 }
101 if is_dropped {
102 false
103 } else if let Some(committed_epoch) = committed_epoch {
104 txs.retain(|tx| tx.send(committed_epoch).is_ok());
105 !txs.is_empty()
106 } else {
107 true
108 }
109 })
110 }
111}
112
113#[derive(Clone, Debug)]
114struct CompactionTaskReportResult {
115 task_id: HummockCompactionTaskId,
116
117 task_status: TaskStatus,
118 reported: bool,
119}
120
121struct CompactionTaskReportNotifiers {
122 txs: HashMap<HummockCompactionTaskId, Vec<oneshot::Sender<CompactionTaskReportResult>>>,
123}
124
125impl CompactionTaskReportNotifiers {
126 fn register(
127 &mut self,
128 task_id: HummockCompactionTaskId,
129 tx: oneshot::Sender<CompactionTaskReportResult>,
130 ) {
131 self.txs.entry(task_id).or_default().push(tx);
132 }
133
134 fn remove(&mut self, task_id: HummockCompactionTaskId) {
135 self.txs.remove(&task_id);
136 }
137
138 fn notify(&mut self, result: CompactionTaskReportResult) {
139 if let Some(txs) = self.txs.remove(&result.task_id) {
140 for tx in txs {
141 let _ = tx.send(result.clone());
142 }
143 }
144 }
145}
146pub struct HummockManager {
152 pub env: MetaSrvEnv,
153
154 metadata_manager: MetadataManager,
155 compaction: MonitoredRwLock<Compaction>,
159 versioning: MonitoredRwLock<Versioning>,
160 compaction_group_manager: MonitoredRwLock<CompactionGroupManager>,
162 context_info: MonitoredRwLock<ContextInfo>,
163
164 pub metrics: Arc<MetaMetrics>,
165
166 pub compactor_manager: CompactorManagerRef,
167 pub iceberg_compactor_manager: Arc<IcebergCompactorManager>,
168 event_sender: HummockManagerEventSender,
169 object_store: ObjectStoreRef,
170 version_checkpoint_path: String,
171 version_archive_dir: String,
172 pause_version_checkpoint: AtomicBool,
173 table_write_throughput_statistic_manager:
174 parking_lot::RwLock<TableWriteThroughputStatisticManager>,
175 table_committed_epoch_notifiers: parking_lot::Mutex<TableCommittedEpochNotifiers>,
176 compaction_task_report_notifiers: parking_lot::Mutex<CompactionTaskReportNotifiers>,
177
178 compactor_streams_change_tx:
182 UnboundedSender<(HummockContextId, Streaming<SubscribeCompactionEventRequest>)>,
183
184 pub compaction_state: CompactionState,
187 full_gc_state: Arc<FullGcState>,
188 now: Mutex<u64>,
189 inflight_time_travel_query: Semaphore,
190 gc_manager: GcManager,
191
192 prefetched_compaction_task_ids: PrefetchedSequence,
194 table_id_to_table_option: parking_lot::RwLock<HashMap<TableId, TableOption>>,
195}
196
197pub type HummockManagerRef = Arc<HummockManager>;
198
199use risingwave_object_store::object::{ObjectError, ObjectStoreRef, build_remote_object_store};
200use risingwave_pb::catalog::Table;
201
202macro_rules! start_measure_real_process_timer {
203 ($hummock_mgr:expr, $func_name:literal) => {
204 $hummock_mgr
205 .metrics
206 .hummock_manager_real_process_time
207 .with_label_values(&[$func_name])
208 .start_timer()
209 };
210}
211pub(crate) use start_measure_real_process_timer;
212
213use super::IcebergCompactorManager;
214use crate::controller::SqlMetaStore;
215use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
216use crate::hummock::manager::worker::HummockManagerEventSender;
217
218impl HummockManager {
219 pub async fn new(
220 env: MetaSrvEnv,
221 metadata_manager: MetadataManager,
222 metrics: Arc<MetaMetrics>,
223 compactor_manager: CompactorManagerRef,
224 compactor_streams_change_tx: UnboundedSender<(
225 HummockContextId,
226 Streaming<SubscribeCompactionEventRequest>,
227 )>,
228 ) -> Result<HummockManagerRef> {
229 let compaction_group_manager = CompactionGroupManager::new(&env).await?;
230 Self::new_impl(
231 env,
232 metadata_manager,
233 metrics,
234 compactor_manager,
235 compaction_group_manager,
236 compactor_streams_change_tx,
237 )
238 .await
239 }
240
241 #[cfg(any(test, feature = "test"))]
242 pub(super) async fn with_config(
243 env: MetaSrvEnv,
244 cluster_controller: crate::controller::cluster::ClusterControllerRef,
245 catalog_controller: crate::controller::catalog::CatalogControllerRef,
246 metrics: Arc<MetaMetrics>,
247 compactor_manager: CompactorManagerRef,
248 config: risingwave_pb::hummock::CompactionConfig,
249 compactor_streams_change_tx: UnboundedSender<(
250 HummockContextId,
251 Streaming<SubscribeCompactionEventRequest>,
252 )>,
253 ) -> HummockManagerRef {
254 let compaction_group_manager = CompactionGroupManager::new_with_config(&env, config)
255 .await
256 .unwrap();
257 let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
258 Self::new_impl(
259 env,
260 metadata_manager,
261 metrics,
262 compactor_manager,
263 compaction_group_manager,
264 compactor_streams_change_tx,
265 )
266 .await
267 .unwrap()
268 }
269
270 async fn new_impl(
271 env: MetaSrvEnv,
272 metadata_manager: MetadataManager,
273 metrics: Arc<MetaMetrics>,
274 compactor_manager: CompactorManagerRef,
275 compaction_group_manager: CompactionGroupManager,
276 compactor_streams_change_tx: UnboundedSender<(
277 HummockContextId,
278 Streaming<SubscribeCompactionEventRequest>,
279 )>,
280 ) -> Result<HummockManagerRef> {
281 let sys_params = env.system_params_reader().await;
282 let state_store_url = sys_params.state_store();
283
284 let state_store_dir: &str = sys_params.data_directory();
285 let use_new_object_prefix_strategy: bool = sys_params.use_new_object_prefix_strategy();
286 let deterministic_mode = env.opts.compaction_deterministic_test;
287 let mut object_store_config = env.opts.object_store_config.clone();
288 object_store_config.set_atomic_write_dir();
291 let object_store = Arc::new(
292 build_remote_object_store(
293 state_store_url.strip_prefix("hummock+").unwrap_or("memory"),
294 metrics.object_store_metric.clone(),
295 "Version Checkpoint",
296 Arc::new(object_store_config),
297 )
298 .await,
299 );
300 if !deterministic_mode {
304 write_exclusive_cluster_id(
305 state_store_dir,
306 env.cluster_id().clone(),
307 object_store.clone(),
308 )
309 .await?;
310
311 if let risingwave_object_store::object::ObjectStoreImpl::S3(s3) = object_store.as_ref()
313 && !env.opts.do_not_config_object_storage_lifecycle
314 {
315 let is_bucket_expiration_configured =
316 s3.inner().configure_bucket_lifecycle(state_store_dir).await;
317 if is_bucket_expiration_configured {
318 return Err(ObjectError::internal("Cluster cannot start with object expiration configured for bucket because RisingWave data will be lost when object expiration kicks in.
319 Please disable object expiration and restart the cluster.")
320 .into());
321 }
322 }
323 }
324 let version_checkpoint_path = version_checkpoint_path(state_store_dir);
325 let version_archive_dir = version_archive_dir(state_store_dir);
326 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
327 let inflight_time_travel_query = env.opts.max_inflight_time_travel_query;
328 let gc_manager = GcManager::new(
329 object_store.clone(),
330 state_store_dir,
331 use_new_object_prefix_strategy,
332 );
333
334 let max_table_statistic_expired_time = std::cmp::max(
335 env.opts.table_stat_throuput_window_seconds_for_split,
336 env.opts.table_stat_throuput_window_seconds_for_merge,
337 ) as i64;
338
339 let iceberg_compactor_manager = Arc::new(IcebergCompactorManager::new());
340
341 let instance = HummockManager {
342 env,
343 versioning: MonitoredRwLock::new(
344 metrics.hummock_manager_lock_time.clone(),
345 Default::default(),
346 "hummock_manager::versioning",
347 ),
348 compaction: MonitoredRwLock::new(
349 metrics.hummock_manager_lock_time.clone(),
350 Default::default(),
351 "hummock_manager::compaction",
352 ),
353 compaction_group_manager: MonitoredRwLock::new(
354 metrics.hummock_manager_lock_time.clone(),
355 compaction_group_manager,
356 "hummock_manager::compaction_group_manager",
357 ),
358 context_info: MonitoredRwLock::new(
359 metrics.hummock_manager_lock_time.clone(),
360 Default::default(),
361 "hummock_manager::context_info",
362 ),
363 metrics,
364 metadata_manager,
365 compactor_manager,
366 iceberg_compactor_manager,
367 event_sender: tx,
368 object_store,
369 version_checkpoint_path,
370 version_archive_dir,
371 pause_version_checkpoint: AtomicBool::new(false),
372 table_write_throughput_statistic_manager: parking_lot::RwLock::new(
373 TableWriteThroughputStatisticManager::new(max_table_statistic_expired_time),
374 ),
375 table_committed_epoch_notifiers: parking_lot::Mutex::new(
376 TableCommittedEpochNotifiers {
377 txs: Default::default(),
378 },
379 ),
380 compaction_task_report_notifiers: parking_lot::Mutex::new(
381 CompactionTaskReportNotifiers {
382 txs: Default::default(),
383 },
384 ),
385 compactor_streams_change_tx,
386 compaction_state: CompactionState::new(),
387 full_gc_state: FullGcState::new().into(),
388 now: Mutex::new(0),
389 inflight_time_travel_query: Semaphore::new(inflight_time_travel_query as usize),
390 gc_manager,
391 prefetched_compaction_task_ids: PrefetchedSequence::new(),
392 table_id_to_table_option: RwLock::new(HashMap::new()),
393 };
394 let instance = Arc::new(instance);
395 instance.init_time_travel_state().await?;
396 instance.may_fill_backward_table_change_logs().await?;
397
398 instance.start_worker(rx);
399 instance.load_meta_store_state().await?;
400 instance.release_invalid_contexts().await?;
401 instance.release_meta_context().await?;
403 Ok(instance)
404 }
405
406 fn meta_store_ref(&self) -> &SqlMetaStore {
407 self.env.meta_store_ref()
408 }
409
410 async fn load_meta_store_state(&self) -> Result<()> {
412 let now = self.load_now().await?;
413 *self.now.lock().await = now.unwrap_or(0);
414
415 let mut compaction_guard = self.compaction.write().await;
416 let mut versioning_guard = self.versioning.write().await;
417 let mut context_info_guard = self.context_info.write().await;
418 self.load_meta_store_state_impl(
419 &mut compaction_guard,
420 &mut versioning_guard,
421 &mut context_info_guard,
422 )
423 .await
424 }
425
426 async fn load_meta_store_state_impl(
428 &self,
429 compaction_guard: &mut Compaction,
430 versioning_guard: &mut Versioning,
431 context_info: &mut ContextInfo,
432 ) -> Result<()> {
433 use sea_orm::EntityTrait;
434 let meta_store = self.meta_store_ref();
435 let compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus> =
436 compaction_status::Entity::find()
437 .all(&meta_store.conn)
438 .await
439 .map_err(MetadataModelError::from)?
440 .into_iter()
441 .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
442 .collect();
443 if !compaction_statuses.is_empty() {
444 compaction_guard.compaction_statuses = compaction_statuses;
445 }
446
447 compaction_guard.compact_task_assignment = compaction_task::Entity::find()
448 .all(&meta_store.conn)
449 .await
450 .map_err(MetadataModelError::from)?
451 .into_iter()
452 .map(|m| {
453 (
454 m.id as HummockCompactionTaskId,
455 PbCompactTaskAssignment::from(m),
456 )
457 })
458 .collect();
459
460 let hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta> =
461 hummock_version_delta::Entity::find()
462 .all(&meta_store.conn)
463 .await
464 .map_err(MetadataModelError::from)?
465 .into_iter()
466 .map(|m| {
467 (
468 m.id,
469 HummockVersionDelta::from_persisted_protobuf_owned(m.into()),
470 )
471 })
472 .collect();
473
474 let checkpoint = self.try_read_checkpoint().await?;
475 let mut redo_state = if let Some(c) = checkpoint {
476 versioning_guard.checkpoint = c;
477 versioning_guard.checkpoint.version.clone()
478 } else {
479 let default_compaction_config = self
480 .compaction_group_manager
481 .read()
482 .await
483 .default_compaction_config();
484 let checkpoint_version = HummockVersion::create_init_version(default_compaction_config);
485 tracing::info!("init hummock version checkpoint");
486 versioning_guard.checkpoint = HummockVersionCheckpoint {
487 version: checkpoint_version.clone(),
488 stale_objects: Default::default(),
489 };
490 self.write_checkpoint(&versioning_guard.checkpoint).await?;
491 checkpoint_version
492 };
493 let mut applied_delta_count = 0;
494 let total_to_apply = hummock_version_deltas.range(redo_state.id + 1..).count();
495 tracing::info!(
496 total_delta = hummock_version_deltas.len(),
497 total_to_apply,
498 "Start redo Hummock version."
499 );
500 for version_delta in hummock_version_deltas
501 .range(redo_state.id + 1..)
502 .map(|(_, v)| v)
503 {
504 assert_eq!(
505 version_delta.prev_id, redo_state.id,
506 "delta prev_id {}, redo state id {}",
507 version_delta.prev_id, redo_state.id
508 );
509 redo_state.apply_version_delta(version_delta);
510 applied_delta_count += 1;
511 if applied_delta_count % 1000 == 0 {
512 tracing::info!("Redo progress {applied_delta_count}/{total_to_apply}.");
513 }
514 }
515 tracing::info!("Finish redo Hummock version.");
516 let pruned_stale_table_id_count = redo_state.prune_stale_table_ids_from_ssts();
517 if pruned_stale_table_id_count > 0 {
518 tracing::warn!(
519 pruned_stale_table_id_count,
520 version_id = ?redo_state.id,
521 "Pruned stale table ids from recovered Hummock SST metadata."
522 );
523 }
524 versioning_guard.version_stats = hummock_version_stats::Entity::find()
525 .one(&meta_store.conn)
526 .await
527 .map_err(MetadataModelError::from)?
528 .map(HummockVersionStats::from)
529 .unwrap_or_else(|| HummockVersionStats {
530 hummock_version_id: 0.into(),
532 ..Default::default()
533 });
534
535 versioning_guard.current_version = redo_state;
536 versioning_guard.hummock_version_deltas = hummock_version_deltas;
537 versioning_guard.table_change_log =
538 risingwave_meta_model::hummock_table_change_log::Entity::find()
539 .all(&self.env.meta_store_ref().conn)
540 .await
541 .map_err(MetadataModelError::from)?
542 .into_iter()
543 .map(|m| (m.table_id, to_table_change_log(m)))
544 .into_group_map()
545 .into_iter()
546 .map(|(table_id, unordered_change_logs)| {
547 (
548 table_id,
549 TableChangeLog::new(
550 unordered_change_logs
551 .into_iter()
552 .sorted_by_key(|l| l.checkpoint_epoch),
553 ),
554 )
555 })
556 .collect();
557
558 context_info.pinned_versions = hummock_pinned_version::Entity::find()
559 .all(&meta_store.conn)
560 .await
561 .map_err(MetadataModelError::from)?
562 .into_iter()
563 .map(|m| (m.context_id as HummockContextId, m.into()))
564 .collect();
565
566 self.initial_compaction_group_config_after_load(
567 versioning_guard,
568 self.compaction_group_manager.write().await.deref_mut(),
569 )
570 .await?;
571
572 Ok(())
573 }
574
575 pub fn init_metadata_for_version_replay(
576 &self,
577 _table_catalogs: Vec<Table>,
578 _compaction_groups: Vec<PbCompactionGroupInfo>,
579 ) -> Result<()> {
580 unimplemented!("kv meta store is deprecated");
581 }
582
583 pub async fn replay_version_delta(
587 &self,
588 mut version_delta: HummockVersionDelta,
589 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
590 let mut versioning_guard = self.versioning.write().await;
591 version_delta.id = versioning_guard.current_version.next_version_id();
593 version_delta.prev_id = versioning_guard.current_version.id;
594 versioning_guard
595 .current_version
596 .apply_version_delta(&version_delta);
597
598 let version_new = versioning_guard.current_version.clone();
599 let compaction_group_ids = version_delta.group_deltas.keys().cloned().collect_vec();
600 Ok((version_new, compaction_group_ids))
601 }
602
603 pub async fn disable_commit_epoch(&self) -> HummockVersion {
604 let mut versioning_guard = self.versioning.write().await;
605 versioning_guard.disable_commit_epochs = true;
606 versioning_guard.current_version.clone()
607 }
608
609 pub fn metadata_manager(&self) -> &MetadataManager {
610 &self.metadata_manager
611 }
612
613 pub fn object_store_media_type(&self) -> &'static str {
614 self.object_store.media_type()
615 }
616
617 pub fn update_table_id_to_table_option(
618 &self,
619 new_table_id_to_table_option: HashMap<TableId, TableOption>,
620 ) {
621 *self.table_id_to_table_option.write() = new_table_id_to_table_option;
622 }
623
624 pub fn metadata_manager_ref(&self) -> &MetadataManager {
625 &self.metadata_manager
626 }
627
628 pub async fn subscribe_table_committed_epoch(
629 &self,
630 table_id: TableId,
631 ) -> MetaResult<(u64, UnboundedReceiver<u64>)> {
632 let version = self.versioning.read().await;
633 if let Some(epoch) = version.current_version.table_committed_epoch(table_id) {
634 let (tx, rx) = unbounded_channel();
635 self.table_committed_epoch_notifiers
636 .lock()
637 .txs
638 .entry(table_id)
639 .or_default()
640 .push(tx);
641 Ok((epoch, rx))
642 } else {
643 Err(anyhow!("table {} not exist", table_id).into())
644 }
645 }
646}
647
648async fn write_exclusive_cluster_id(
649 state_store_dir: &str,
650 cluster_id: ClusterId,
651 object_store: ObjectStoreRef,
652) -> Result<()> {
653 const CLUSTER_ID_DIR: &str = "cluster_id";
654 const CLUSTER_ID_NAME: &str = "0";
655 let cluster_id_dir = format!("{}/{}/", state_store_dir, CLUSTER_ID_DIR);
656 let cluster_id_full_path = format!("{}{}", cluster_id_dir, CLUSTER_ID_NAME);
657 tracing::info!("try reading cluster_id");
658 match object_store.read(&cluster_id_full_path, ..).await {
659 Ok(stored_cluster_id) => {
660 let stored_cluster_id = String::from_utf8(stored_cluster_id.to_vec()).unwrap();
661 if cluster_id.deref() == stored_cluster_id {
662 return Ok(());
663 }
664
665 Err(ObjectError::internal(format!(
666 "Data directory is already used by another cluster with id {:?}, path {}.",
667 stored_cluster_id, cluster_id_full_path,
668 ))
669 .into())
670 }
671 Err(e) => {
672 if e.is_object_not_found_error() {
673 tracing::info!("cluster_id not found, writing cluster_id");
674 object_store
675 .upload(&cluster_id_full_path, Bytes::from(String::from(cluster_id)))
676 .await?;
677 return Ok(());
678 }
679 Err(e.into())
680 }
681 }
682}