1use std::collections::{BTreeMap, HashMap};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19
20use anyhow::anyhow;
21use bytes::Bytes;
22use itertools::Itertools;
23use parking_lot::lock_api::RwLock;
24use risingwave_common::catalog::{TableId, TableOption};
25use risingwave_common::monitor::MonitoredRwLock;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_hummock_sdk::change_log::TableChangeLog;
28use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
29use risingwave_hummock_sdk::{
30 CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockVersionId,
31 version_archive_dir, version_checkpoint_path,
32};
33use risingwave_meta_model::{
34 compaction_status, compaction_task, hummock_pinned_version, hummock_version_delta,
35 hummock_version_stats,
36};
37use risingwave_pb::hummock::compact_task::TaskStatus;
38use risingwave_pb::hummock::{
39 HummockVersionStats, PbCompactTaskAssignment, PbCompactionGroupInfo,
40 SubscribeCompactionEventRequest,
41};
42use table_write_throughput_statistic::TableWriteThroughputStatisticManager;
43use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
44use tokio::sync::{Mutex, Semaphore, oneshot};
45use tonic::Streaming;
46
47use crate::MetaResult;
48use crate::hummock::CompactorManagerRef;
49use crate::hummock::compaction::CompactStatus;
50use crate::hummock::error::Result;
51use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
52use crate::hummock::manager::context::ContextInfo;
53use crate::hummock::manager::gc::{FullGcState, GcManager};
54use crate::hummock::manager::sequence::PrefetchedSequence;
55use crate::hummock::model::ext::to_table_change_log;
56use crate::manager::{MetaSrvEnv, MetadataManager};
57use crate::model::{ClusterId, MetadataModelError};
58use crate::rpc::metrics::MetaMetrics;
59
60mod context;
61mod gc;
62mod tests;
63mod versioning;
64pub use context::HummockVersionSafePoint;
65use versioning::*;
66pub(crate) mod checkpoint;
67mod commit_epoch;
68mod compaction;
69pub mod sequence;
70pub mod table_write_throughput_statistic;
71pub mod time_travel;
72mod timer_task;
73mod transaction;
74mod utils;
75mod worker;
76
77pub use commit_epoch::{CommitEpochInfo, NewTableFragmentInfo};
78pub use compaction::compaction_event_loop::*;
79use compaction::*;
80pub use compaction::{GroupState, GroupStateValidator, ManualCompactionTriggerResult};
81pub(crate) use utils::*;
82
83struct TableCommittedEpochNotifiers {
84 txs: HashMap<TableId, Vec<UnboundedSender<u64>>>,
85}
86
87impl TableCommittedEpochNotifiers {
88 fn notify_deltas(&mut self, deltas: &[HummockVersionDelta]) {
89 self.txs.retain(|table_id, txs| {
90 let mut is_dropped = false;
91 let mut committed_epoch = None;
92 for delta in deltas {
93 if delta.removed_table_ids.contains(table_id) {
94 is_dropped = true;
95 break;
96 }
97 if let Some(info) = delta.state_table_info_delta.get(table_id) {
98 committed_epoch = Some(info.committed_epoch);
99 }
100 }
101 if is_dropped {
102 false
103 } else if let Some(committed_epoch) = committed_epoch {
104 txs.retain(|tx| tx.send(committed_epoch).is_ok());
105 !txs.is_empty()
106 } else {
107 true
108 }
109 })
110 }
111}
112
113#[derive(Clone, Debug)]
114struct CompactionTaskReportResult {
115 task_id: HummockCompactionTaskId,
116 #[allow(dead_code)]
117 task_status: TaskStatus,
118 reported: bool,
119}
120
121struct CompactionTaskReportNotifiers {
122 txs: HashMap<HummockCompactionTaskId, Vec<oneshot::Sender<CompactionTaskReportResult>>>,
123}
124
125impl CompactionTaskReportNotifiers {
126 fn register(
127 &mut self,
128 task_id: HummockCompactionTaskId,
129 tx: oneshot::Sender<CompactionTaskReportResult>,
130 ) {
131 self.txs.entry(task_id).or_default().push(tx);
132 }
133
134 fn remove(&mut self, task_id: HummockCompactionTaskId) {
135 self.txs.remove(&task_id);
136 }
137
138 fn notify(&mut self, result: CompactionTaskReportResult) {
139 if let Some(txs) = self.txs.remove(&result.task_id) {
140 for tx in txs {
141 let _ = tx.send(result.clone());
142 }
143 }
144 }
145}
146pub struct HummockManager {
152 pub env: MetaSrvEnv,
153
154 metadata_manager: MetadataManager,
155 compaction: MonitoredRwLock<Compaction>,
159 versioning: MonitoredRwLock<Versioning>,
160 compaction_group_manager: MonitoredRwLock<CompactionGroupManager>,
162 context_info: MonitoredRwLock<ContextInfo>,
163
164 pub metrics: Arc<MetaMetrics>,
165
166 pub compactor_manager: CompactorManagerRef,
167 pub iceberg_compactor_manager: Arc<IcebergCompactorManager>,
168 event_sender: HummockManagerEventSender,
169 object_store: ObjectStoreRef,
170 version_checkpoint_path: String,
171 version_archive_dir: String,
172 pause_version_checkpoint: AtomicBool,
173 table_write_throughput_statistic_manager:
174 parking_lot::RwLock<TableWriteThroughputStatisticManager>,
175 table_committed_epoch_notifiers: parking_lot::Mutex<TableCommittedEpochNotifiers>,
176 compaction_task_report_notifiers: parking_lot::Mutex<CompactionTaskReportNotifiers>,
177
178 compactor_streams_change_tx:
182 UnboundedSender<(HummockContextId, Streaming<SubscribeCompactionEventRequest>)>,
183
184 pub compaction_state: CompactionState,
187 full_gc_state: Arc<FullGcState>,
188 now: Mutex<u64>,
189 inflight_time_travel_query: Semaphore,
190 gc_manager: GcManager,
191
192 prefetched_compaction_task_ids: PrefetchedSequence,
194 table_id_to_table_option: parking_lot::RwLock<HashMap<TableId, TableOption>>,
195}
196
197pub type HummockManagerRef = Arc<HummockManager>;
198
199use risingwave_object_store::object::{ObjectError, ObjectStoreRef, build_remote_object_store};
200use risingwave_pb::catalog::Table;
201
202macro_rules! start_measure_real_process_timer {
203 ($hummock_mgr:expr, $func_name:literal) => {
204 $hummock_mgr
205 .metrics
206 .hummock_manager_real_process_time
207 .with_label_values(&[$func_name])
208 .start_timer()
209 };
210}
211pub(crate) use start_measure_real_process_timer;
212
213use super::IcebergCompactorManager;
214use crate::controller::SqlMetaStore;
215use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
216use crate::hummock::manager::worker::HummockManagerEventSender;
217
218impl HummockManager {
219 pub async fn new(
220 env: MetaSrvEnv,
221 metadata_manager: MetadataManager,
222 metrics: Arc<MetaMetrics>,
223 compactor_manager: CompactorManagerRef,
224 compactor_streams_change_tx: UnboundedSender<(
225 HummockContextId,
226 Streaming<SubscribeCompactionEventRequest>,
227 )>,
228 ) -> Result<HummockManagerRef> {
229 let compaction_group_manager = CompactionGroupManager::new(&env).await?;
230 Self::new_impl(
231 env,
232 metadata_manager,
233 metrics,
234 compactor_manager,
235 compaction_group_manager,
236 compactor_streams_change_tx,
237 )
238 .await
239 }
240
241 #[cfg(any(test, feature = "test"))]
242 pub(super) async fn with_config(
243 env: MetaSrvEnv,
244 cluster_controller: crate::controller::cluster::ClusterControllerRef,
245 catalog_controller: crate::controller::catalog::CatalogControllerRef,
246 metrics: Arc<MetaMetrics>,
247 compactor_manager: CompactorManagerRef,
248 config: risingwave_pb::hummock::CompactionConfig,
249 compactor_streams_change_tx: UnboundedSender<(
250 HummockContextId,
251 Streaming<SubscribeCompactionEventRequest>,
252 )>,
253 ) -> HummockManagerRef {
254 let compaction_group_manager = CompactionGroupManager::new_with_config(&env, config)
255 .await
256 .unwrap();
257 let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
258 Self::new_impl(
259 env,
260 metadata_manager,
261 metrics,
262 compactor_manager,
263 compaction_group_manager,
264 compactor_streams_change_tx,
265 )
266 .await
267 .unwrap()
268 }
269
270 async fn new_impl(
271 env: MetaSrvEnv,
272 metadata_manager: MetadataManager,
273 metrics: Arc<MetaMetrics>,
274 compactor_manager: CompactorManagerRef,
275 compaction_group_manager: CompactionGroupManager,
276 compactor_streams_change_tx: UnboundedSender<(
277 HummockContextId,
278 Streaming<SubscribeCompactionEventRequest>,
279 )>,
280 ) -> Result<HummockManagerRef> {
281 let sys_params = env.system_params_reader().await;
282 let state_store_url = sys_params.state_store();
283
284 let state_store_dir: &str = sys_params.data_directory();
285 let use_new_object_prefix_strategy: bool = sys_params.use_new_object_prefix_strategy();
286 let deterministic_mode = env.opts.compaction_deterministic_test;
287 let mut object_store_config = env.opts.object_store_config.clone();
288 object_store_config.set_atomic_write_dir();
291 let object_store = Arc::new(
292 build_remote_object_store(
293 state_store_url.strip_prefix("hummock+").unwrap_or("memory"),
294 metrics.object_store_metric.clone(),
295 "Version Checkpoint",
296 Arc::new(object_store_config),
297 )
298 .await,
299 );
300 if !deterministic_mode {
304 write_exclusive_cluster_id(
305 state_store_dir,
306 env.cluster_id().clone(),
307 object_store.clone(),
308 )
309 .await?;
310
311 if let risingwave_object_store::object::ObjectStoreImpl::S3(s3) = object_store.as_ref()
313 && !env.opts.do_not_config_object_storage_lifecycle
314 {
315 let is_bucket_expiration_configured =
316 s3.inner().configure_bucket_lifecycle(state_store_dir).await;
317 if is_bucket_expiration_configured {
318 return Err(ObjectError::internal("Cluster cannot start with object expiration configured for bucket because RisingWave data will be lost when object expiration kicks in.
319 Please disable object expiration and restart the cluster.")
320 .into());
321 }
322 }
323 }
324 let version_checkpoint_path = version_checkpoint_path(state_store_dir);
325 let version_archive_dir = version_archive_dir(state_store_dir);
326 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
327 let inflight_time_travel_query = env.opts.max_inflight_time_travel_query;
328 let gc_manager = GcManager::new(
329 object_store.clone(),
330 state_store_dir,
331 use_new_object_prefix_strategy,
332 );
333
334 let max_table_statistic_expired_time = std::cmp::max(
335 env.opts.table_stat_throuput_window_seconds_for_split,
336 env.opts.table_stat_throuput_window_seconds_for_merge,
337 ) as i64;
338
339 let iceberg_compactor_manager = Arc::new(IcebergCompactorManager::new());
340
341 let instance = HummockManager {
342 env,
343 versioning: MonitoredRwLock::new(
344 metrics.hummock_manager_lock_time.clone(),
345 Default::default(),
346 "hummock_manager::versioning",
347 ),
348 compaction: MonitoredRwLock::new(
349 metrics.hummock_manager_lock_time.clone(),
350 Default::default(),
351 "hummock_manager::compaction",
352 ),
353 compaction_group_manager: MonitoredRwLock::new(
354 metrics.hummock_manager_lock_time.clone(),
355 compaction_group_manager,
356 "hummock_manager::compaction_group_manager",
357 ),
358 context_info: MonitoredRwLock::new(
359 metrics.hummock_manager_lock_time.clone(),
360 Default::default(),
361 "hummock_manager::context_info",
362 ),
363 metrics,
364 metadata_manager,
365 compactor_manager,
366 iceberg_compactor_manager,
367 event_sender: tx,
368 object_store,
369 version_checkpoint_path,
370 version_archive_dir,
371 pause_version_checkpoint: AtomicBool::new(false),
372 table_write_throughput_statistic_manager: parking_lot::RwLock::new(
373 TableWriteThroughputStatisticManager::new(max_table_statistic_expired_time),
374 ),
375 table_committed_epoch_notifiers: parking_lot::Mutex::new(
376 TableCommittedEpochNotifiers {
377 txs: Default::default(),
378 },
379 ),
380 compaction_task_report_notifiers: parking_lot::Mutex::new(
381 CompactionTaskReportNotifiers {
382 txs: Default::default(),
383 },
384 ),
385 compactor_streams_change_tx,
386 compaction_state: CompactionState::new(),
387 full_gc_state: FullGcState::new().into(),
388 now: Mutex::new(0),
389 inflight_time_travel_query: Semaphore::new(inflight_time_travel_query as usize),
390 gc_manager,
391 prefetched_compaction_task_ids: PrefetchedSequence::new(),
392 table_id_to_table_option: RwLock::new(HashMap::new()),
393 };
394 let instance = Arc::new(instance);
395 instance.init_time_travel_state().await?;
396 instance.may_fill_backward_table_change_logs().await?;
397
398 instance.start_worker(rx);
399 instance.load_meta_store_state().await?;
400 instance.release_invalid_contexts().await?;
401 instance.release_meta_context().await?;
403 Ok(instance)
404 }
405
406 fn meta_store_ref(&self) -> &SqlMetaStore {
407 self.env.meta_store_ref()
408 }
409
410 async fn load_meta_store_state(&self) -> Result<()> {
412 let now = self.load_now().await?;
413 *self.now.lock().await = now.unwrap_or(0);
414
415 let mut compaction_guard = self.compaction.write().await;
416 let mut versioning_guard = self.versioning.write().await;
417 let mut context_info_guard = self.context_info.write().await;
418 self.load_meta_store_state_impl(
419 &mut compaction_guard,
420 &mut versioning_guard,
421 &mut context_info_guard,
422 )
423 .await
424 }
425
426 async fn load_meta_store_state_impl(
428 &self,
429 compaction_guard: &mut Compaction,
430 versioning_guard: &mut Versioning,
431 context_info: &mut ContextInfo,
432 ) -> Result<()> {
433 use sea_orm::EntityTrait;
434 let meta_store = self.meta_store_ref();
435 let compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus> =
436 compaction_status::Entity::find()
437 .all(&meta_store.conn)
438 .await
439 .map_err(MetadataModelError::from)?
440 .into_iter()
441 .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
442 .collect();
443 if !compaction_statuses.is_empty() {
444 compaction_guard.compaction_statuses = compaction_statuses;
445 }
446
447 compaction_guard.compact_task_assignment = compaction_task::Entity::find()
448 .all(&meta_store.conn)
449 .await
450 .map_err(MetadataModelError::from)?
451 .into_iter()
452 .map(|m| {
453 (
454 m.id as HummockCompactionTaskId,
455 PbCompactTaskAssignment::from(m),
456 )
457 })
458 .collect();
459
460 let hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta> =
461 hummock_version_delta::Entity::find()
462 .all(&meta_store.conn)
463 .await
464 .map_err(MetadataModelError::from)?
465 .into_iter()
466 .map(|m| {
467 (
468 m.id,
469 HummockVersionDelta::from_persisted_protobuf_owned(m.into()),
470 )
471 })
472 .collect();
473
474 let checkpoint = self.try_read_checkpoint().await?;
475 let mut redo_state = if let Some(c) = checkpoint {
476 versioning_guard.checkpoint = c;
477 versioning_guard.checkpoint.version.clone()
478 } else {
479 let default_compaction_config = self
480 .compaction_group_manager
481 .read()
482 .await
483 .default_compaction_config();
484 let checkpoint_version = HummockVersion::create_init_version(default_compaction_config);
485 tracing::info!("init hummock version checkpoint");
486 versioning_guard.checkpoint = HummockVersionCheckpoint {
487 version: checkpoint_version.clone(),
488 stale_objects: Default::default(),
489 };
490 self.write_checkpoint(&versioning_guard.checkpoint).await?;
491 checkpoint_version
492 };
493 let mut applied_delta_count = 0;
494 let total_to_apply = hummock_version_deltas.range(redo_state.id + 1..).count();
495 tracing::info!(
496 total_delta = hummock_version_deltas.len(),
497 total_to_apply,
498 "Start redo Hummock version."
499 );
500 for version_delta in hummock_version_deltas
501 .range(redo_state.id + 1..)
502 .map(|(_, v)| v)
503 {
504 assert_eq!(
505 version_delta.prev_id, redo_state.id,
506 "delta prev_id {}, redo state id {}",
507 version_delta.prev_id, redo_state.id
508 );
509 redo_state.apply_version_delta(version_delta);
510 applied_delta_count += 1;
511 if applied_delta_count % 1000 == 0 {
512 tracing::info!("Redo progress {applied_delta_count}/{total_to_apply}.");
513 }
514 }
515 tracing::info!("Finish redo Hummock version.");
516 versioning_guard.version_stats = hummock_version_stats::Entity::find()
517 .one(&meta_store.conn)
518 .await
519 .map_err(MetadataModelError::from)?
520 .map(HummockVersionStats::from)
521 .unwrap_or_else(|| HummockVersionStats {
522 hummock_version_id: 0.into(),
524 ..Default::default()
525 });
526
527 versioning_guard.current_version = redo_state;
528 versioning_guard.hummock_version_deltas = hummock_version_deltas;
529 versioning_guard.table_change_log =
530 risingwave_meta_model::hummock_table_change_log::Entity::find()
531 .all(&self.env.meta_store_ref().conn)
532 .await
533 .map_err(MetadataModelError::from)?
534 .into_iter()
535 .map(|m| (m.table_id, to_table_change_log(m)))
536 .into_group_map()
537 .into_iter()
538 .map(|(table_id, unordered_change_logs)| {
539 (
540 table_id,
541 TableChangeLog::new(
542 unordered_change_logs
543 .into_iter()
544 .sorted_by_key(|l| l.checkpoint_epoch),
545 ),
546 )
547 })
548 .collect();
549
550 context_info.pinned_versions = hummock_pinned_version::Entity::find()
551 .all(&meta_store.conn)
552 .await
553 .map_err(MetadataModelError::from)?
554 .into_iter()
555 .map(|m| (m.context_id as HummockContextId, m.into()))
556 .collect();
557
558 self.initial_compaction_group_config_after_load(
559 versioning_guard,
560 self.compaction_group_manager.write().await.deref_mut(),
561 )
562 .await?;
563
564 Ok(())
565 }
566
567 pub fn init_metadata_for_version_replay(
568 &self,
569 _table_catalogs: Vec<Table>,
570 _compaction_groups: Vec<PbCompactionGroupInfo>,
571 ) -> Result<()> {
572 unimplemented!("kv meta store is deprecated");
573 }
574
575 pub async fn replay_version_delta(
579 &self,
580 mut version_delta: HummockVersionDelta,
581 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
582 let mut versioning_guard = self.versioning.write().await;
583 version_delta.id = versioning_guard.current_version.next_version_id();
585 version_delta.prev_id = versioning_guard.current_version.id;
586 versioning_guard
587 .current_version
588 .apply_version_delta(&version_delta);
589
590 let version_new = versioning_guard.current_version.clone();
591 let compaction_group_ids = version_delta.group_deltas.keys().cloned().collect_vec();
592 Ok((version_new, compaction_group_ids))
593 }
594
595 pub async fn disable_commit_epoch(&self) -> HummockVersion {
596 let mut versioning_guard = self.versioning.write().await;
597 versioning_guard.disable_commit_epochs = true;
598 versioning_guard.current_version.clone()
599 }
600
601 pub fn metadata_manager(&self) -> &MetadataManager {
602 &self.metadata_manager
603 }
604
605 pub fn object_store_media_type(&self) -> &'static str {
606 self.object_store.media_type()
607 }
608
609 pub fn update_table_id_to_table_option(
610 &self,
611 new_table_id_to_table_option: HashMap<TableId, TableOption>,
612 ) {
613 *self.table_id_to_table_option.write() = new_table_id_to_table_option;
614 }
615
616 pub fn metadata_manager_ref(&self) -> &MetadataManager {
617 &self.metadata_manager
618 }
619
620 pub async fn subscribe_table_committed_epoch(
621 &self,
622 table_id: TableId,
623 ) -> MetaResult<(u64, UnboundedReceiver<u64>)> {
624 let version = self.versioning.read().await;
625 if let Some(epoch) = version.current_version.table_committed_epoch(table_id) {
626 let (tx, rx) = unbounded_channel();
627 self.table_committed_epoch_notifiers
628 .lock()
629 .txs
630 .entry(table_id)
631 .or_default()
632 .push(tx);
633 Ok((epoch, rx))
634 } else {
635 Err(anyhow!("table {} not exist", table_id).into())
636 }
637 }
638}
639
640async fn write_exclusive_cluster_id(
641 state_store_dir: &str,
642 cluster_id: ClusterId,
643 object_store: ObjectStoreRef,
644) -> Result<()> {
645 const CLUSTER_ID_DIR: &str = "cluster_id";
646 const CLUSTER_ID_NAME: &str = "0";
647 let cluster_id_dir = format!("{}/{}/", state_store_dir, CLUSTER_ID_DIR);
648 let cluster_id_full_path = format!("{}{}", cluster_id_dir, CLUSTER_ID_NAME);
649 tracing::info!("try reading cluster_id");
650 match object_store.read(&cluster_id_full_path, ..).await {
651 Ok(stored_cluster_id) => {
652 let stored_cluster_id = String::from_utf8(stored_cluster_id.to_vec()).unwrap();
653 if cluster_id.deref() == stored_cluster_id {
654 return Ok(());
655 }
656
657 Err(ObjectError::internal(format!(
658 "Data directory is already used by another cluster with id {:?}, path {}.",
659 stored_cluster_id, cluster_id_full_path,
660 ))
661 .into())
662 }
663 Err(e) => {
664 if e.is_object_not_found_error() {
665 tracing::info!("cluster_id not found, writing cluster_id");
666 object_store
667 .upload(&cluster_id_full_path, Bytes::from(String::from(cluster_id)))
668 .await?;
669 return Ok(());
670 }
671 Err(e.into())
672 }
673 }
674}