1use std::collections::{BTreeMap, HashMap};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19
20use anyhow::anyhow;
21use bytes::Bytes;
22use itertools::Itertools;
23use parking_lot::lock_api::RwLock;
24use risingwave_common::catalog::{TableId, TableOption};
25use risingwave_common::monitor::MonitoredRwLock;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_hummock_sdk::change_log::TableChangeLog;
28use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
29use risingwave_hummock_sdk::{
30 CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockVersionId,
31 version_archive_dir, version_checkpoint_path,
32};
33use risingwave_meta_model::{
34 compaction_status, compaction_task, hummock_pinned_version, hummock_version_delta,
35 hummock_version_stats,
36};
37use risingwave_pb::hummock::compact_task::TaskStatus;
38use risingwave_pb::hummock::{
39 HummockVersionStats, PbCompactionGroupInfo, SubscribeCompactionEventRequest,
40};
41use table_write_throughput_statistic::TableWriteThroughputStatisticManager;
42use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
43use tokio::sync::{Mutex, Semaphore, oneshot};
44use tonic::Streaming;
45
46use crate::MetaResult;
47use crate::hummock::CompactorManagerRef;
48use crate::hummock::compaction::CompactStatus;
49use crate::hummock::error::Result;
50use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
51use crate::hummock::manager::context::ContextInfo;
52use crate::hummock::manager::gc::{FullGcState, GcManager};
53use crate::hummock::manager::sequence::PrefetchedSequence;
54use crate::hummock::model::ext::{compaction_task_model_to_assignment, to_table_change_log};
55use crate::manager::{MetaSrvEnv, MetadataManager};
56use crate::model::{ClusterId, MetadataModelError};
57use crate::rpc::metrics::MetaMetrics;
58
59mod context;
60mod gc;
61mod tests;
62mod versioning;
63pub use context::HummockVersionSafePoint;
64use versioning::*;
65pub(crate) mod checkpoint;
66mod commit_epoch;
67mod compaction;
68pub mod sequence;
69pub mod table_write_throughput_statistic;
70pub mod time_travel;
71mod timer_task;
72mod transaction;
73mod utils;
74mod worker;
75
76pub use commit_epoch::{CommitEpochInfo, NewTableFragmentInfo};
77pub use compaction::compaction_event_loop::*;
78use compaction::*;
79pub use compaction::{GroupState, GroupStateValidator, ManualCompactionTriggerResult};
80pub(crate) use utils::*;
81
82struct TableCommittedEpochNotifiers {
83 txs: HashMap<TableId, Vec<UnboundedSender<u64>>>,
84}
85
86impl TableCommittedEpochNotifiers {
87 fn notify_deltas(&mut self, deltas: &[HummockVersionDelta]) {
88 self.txs.retain(|table_id, txs| {
89 let mut is_dropped = false;
90 let mut committed_epoch = None;
91 for delta in deltas {
92 if delta.removed_table_ids.contains(table_id) {
93 is_dropped = true;
94 break;
95 }
96 if let Some(info) = delta.state_table_info_delta.get(table_id) {
97 committed_epoch = Some(info.committed_epoch);
98 }
99 }
100 if is_dropped {
101 false
102 } else if let Some(committed_epoch) = committed_epoch {
103 txs.retain(|tx| tx.send(committed_epoch).is_ok());
104 !txs.is_empty()
105 } else {
106 true
107 }
108 })
109 }
110}
111
112#[derive(Clone, Debug)]
113struct CompactionTaskReportResult {
114 task_id: HummockCompactionTaskId,
115
116 task_status: TaskStatus,
117 reported: bool,
118}
119
120struct CompactionTaskReportNotifiers {
121 txs: HashMap<HummockCompactionTaskId, Vec<oneshot::Sender<CompactionTaskReportResult>>>,
122}
123
124impl CompactionTaskReportNotifiers {
125 fn register(
126 &mut self,
127 task_id: HummockCompactionTaskId,
128 tx: oneshot::Sender<CompactionTaskReportResult>,
129 ) {
130 self.txs.entry(task_id).or_default().push(tx);
131 }
132
133 fn remove(&mut self, task_id: HummockCompactionTaskId) {
134 self.txs.remove(&task_id);
135 }
136
137 fn notify(&mut self, result: CompactionTaskReportResult) {
138 if let Some(txs) = self.txs.remove(&result.task_id) {
139 for tx in txs {
140 let _ = tx.send(result.clone());
141 }
142 }
143 }
144}
145pub struct HummockManager {
151 pub env: MetaSrvEnv,
152
153 metadata_manager: MetadataManager,
154 compaction: MonitoredRwLock<Compaction>,
158 versioning: MonitoredRwLock<Versioning>,
159 compaction_group_manager: MonitoredRwLock<CompactionGroupManager>,
161 context_info: MonitoredRwLock<ContextInfo>,
162
163 pub metrics: Arc<MetaMetrics>,
164
165 pub compactor_manager: CompactorManagerRef,
166 pub iceberg_compactor_manager: Arc<IcebergCompactorManager>,
167 event_sender: HummockManagerEventSender,
168 object_store: ObjectStoreRef,
169 version_checkpoint_path: String,
170 version_archive_dir: String,
171 pause_version_checkpoint: AtomicBool,
172 table_write_throughput_statistic_manager:
173 parking_lot::RwLock<TableWriteThroughputStatisticManager>,
174 table_committed_epoch_notifiers: parking_lot::Mutex<TableCommittedEpochNotifiers>,
175 compaction_task_report_notifiers: parking_lot::Mutex<CompactionTaskReportNotifiers>,
176
177 compactor_streams_change_tx:
181 UnboundedSender<(HummockContextId, Streaming<SubscribeCompactionEventRequest>)>,
182
183 pub compaction_state: CompactionState,
186 full_gc_state: Arc<FullGcState>,
187 now: Mutex<u64>,
188 inflight_time_travel_query: Semaphore,
189 gc_manager: GcManager,
190
191 prefetched_compaction_task_ids: PrefetchedSequence,
193 table_id_to_table_option: parking_lot::RwLock<HashMap<TableId, TableOption>>,
194}
195
196pub type HummockManagerRef = Arc<HummockManager>;
197
198use risingwave_object_store::object::{ObjectError, ObjectStoreRef, build_remote_object_store};
199use risingwave_pb::catalog::Table;
200
201macro_rules! start_measure_real_process_timer {
202 ($hummock_mgr:expr, $func_name:literal) => {
203 $hummock_mgr
204 .metrics
205 .hummock_manager_real_process_time
206 .with_label_values(&[$func_name])
207 .start_timer()
208 };
209}
210pub(crate) use start_measure_real_process_timer;
211
212use super::IcebergCompactorManager;
213use crate::controller::SqlMetaStore;
214use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
215use crate::hummock::manager::worker::HummockManagerEventSender;
216
217impl HummockManager {
218 pub async fn new(
219 env: MetaSrvEnv,
220 metadata_manager: MetadataManager,
221 metrics: Arc<MetaMetrics>,
222 compactor_manager: CompactorManagerRef,
223 compactor_streams_change_tx: UnboundedSender<(
224 HummockContextId,
225 Streaming<SubscribeCompactionEventRequest>,
226 )>,
227 ) -> Result<HummockManagerRef> {
228 let compaction_group_manager = CompactionGroupManager::new(&env).await?;
229 Self::new_impl(
230 env,
231 metadata_manager,
232 metrics,
233 compactor_manager,
234 compaction_group_manager,
235 compactor_streams_change_tx,
236 )
237 .await
238 }
239
240 #[cfg(any(test, feature = "test"))]
241 pub(super) async fn with_config(
242 env: MetaSrvEnv,
243 cluster_controller: crate::controller::cluster::ClusterControllerRef,
244 catalog_controller: crate::controller::catalog::CatalogControllerRef,
245 metrics: Arc<MetaMetrics>,
246 compactor_manager: CompactorManagerRef,
247 config: risingwave_pb::hummock::CompactionConfig,
248 compactor_streams_change_tx: UnboundedSender<(
249 HummockContextId,
250 Streaming<SubscribeCompactionEventRequest>,
251 )>,
252 ) -> HummockManagerRef {
253 let compaction_group_manager = CompactionGroupManager::new_with_config(&env, config)
254 .await
255 .unwrap();
256 let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
257 Self::new_impl(
258 env,
259 metadata_manager,
260 metrics,
261 compactor_manager,
262 compaction_group_manager,
263 compactor_streams_change_tx,
264 )
265 .await
266 .unwrap()
267 }
268
269 async fn new_impl(
270 env: MetaSrvEnv,
271 metadata_manager: MetadataManager,
272 metrics: Arc<MetaMetrics>,
273 compactor_manager: CompactorManagerRef,
274 compaction_group_manager: CompactionGroupManager,
275 compactor_streams_change_tx: UnboundedSender<(
276 HummockContextId,
277 Streaming<SubscribeCompactionEventRequest>,
278 )>,
279 ) -> Result<HummockManagerRef> {
280 let sys_params = env.system_params_reader().await;
281 let state_store_url = sys_params.state_store();
282
283 let state_store_dir: &str = sys_params.data_directory();
284 let use_new_object_prefix_strategy: bool = sys_params.use_new_object_prefix_strategy();
285 let deterministic_mode = env.opts.compaction_deterministic_test;
286 let mut object_store_config = env.opts.object_store_config.clone();
287 object_store_config.set_atomic_write_dir();
290 let object_store = Arc::new(
291 build_remote_object_store(
292 state_store_url.strip_prefix("hummock+").unwrap_or("memory"),
293 metrics.object_store_metric.clone(),
294 "Version Checkpoint",
295 Arc::new(object_store_config),
296 )
297 .await,
298 );
299 if !deterministic_mode {
303 write_exclusive_cluster_id(
304 state_store_dir,
305 env.cluster_id().clone(),
306 object_store.clone(),
307 )
308 .await?;
309
310 if let risingwave_object_store::object::ObjectStoreImpl::S3(s3) = object_store.as_ref()
312 && !env.opts.do_not_config_object_storage_lifecycle
313 {
314 let is_bucket_expiration_configured =
315 s3.inner().configure_bucket_lifecycle(state_store_dir).await;
316 if is_bucket_expiration_configured {
317 return Err(ObjectError::internal("Cluster cannot start with object expiration configured for bucket because RisingWave data will be lost when object expiration kicks in.
318 Please disable object expiration and restart the cluster.")
319 .into());
320 }
321 }
322 }
323 let version_checkpoint_path = version_checkpoint_path(state_store_dir);
324 let version_archive_dir = version_archive_dir(state_store_dir);
325 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
326 let inflight_time_travel_query = env.opts.max_inflight_time_travel_query;
327 let gc_manager = GcManager::new(
328 object_store.clone(),
329 state_store_dir,
330 use_new_object_prefix_strategy,
331 );
332
333 let max_table_statistic_expired_time = std::cmp::max(
334 env.opts.table_stat_throuput_window_seconds_for_split,
335 env.opts.table_stat_throuput_window_seconds_for_merge,
336 ) as i64;
337
338 let iceberg_compactor_manager = Arc::new(IcebergCompactorManager::new());
339
340 let instance = HummockManager {
341 env,
342 versioning: MonitoredRwLock::new(
343 metrics.hummock_manager_lock_time.clone(),
344 Default::default(),
345 "hummock_manager::versioning",
346 ),
347 compaction: MonitoredRwLock::new(
348 metrics.hummock_manager_lock_time.clone(),
349 Default::default(),
350 "hummock_manager::compaction",
351 ),
352 compaction_group_manager: MonitoredRwLock::new(
353 metrics.hummock_manager_lock_time.clone(),
354 compaction_group_manager,
355 "hummock_manager::compaction_group_manager",
356 ),
357 context_info: MonitoredRwLock::new(
358 metrics.hummock_manager_lock_time.clone(),
359 Default::default(),
360 "hummock_manager::context_info",
361 ),
362 metrics,
363 metadata_manager,
364 compactor_manager,
365 iceberg_compactor_manager,
366 event_sender: tx,
367 object_store,
368 version_checkpoint_path,
369 version_archive_dir,
370 pause_version_checkpoint: AtomicBool::new(false),
371 table_write_throughput_statistic_manager: parking_lot::RwLock::new(
372 TableWriteThroughputStatisticManager::new(max_table_statistic_expired_time),
373 ),
374 table_committed_epoch_notifiers: parking_lot::Mutex::new(
375 TableCommittedEpochNotifiers {
376 txs: Default::default(),
377 },
378 ),
379 compaction_task_report_notifiers: parking_lot::Mutex::new(
380 CompactionTaskReportNotifiers {
381 txs: Default::default(),
382 },
383 ),
384 compactor_streams_change_tx,
385 compaction_state: CompactionState::new(),
386 full_gc_state: FullGcState::new().into(),
387 now: Mutex::new(0),
388 inflight_time_travel_query: Semaphore::new(inflight_time_travel_query as usize),
389 gc_manager,
390 prefetched_compaction_task_ids: PrefetchedSequence::new(),
391 table_id_to_table_option: RwLock::new(HashMap::new()),
392 };
393 let instance = Arc::new(instance);
394 instance.init_time_travel_state().await?;
395 instance.may_fill_backward_table_change_logs().await?;
396
397 instance.start_worker(rx);
398 instance.load_meta_store_state().await?;
399 instance.release_invalid_contexts().await?;
400 instance.release_meta_context().await?;
402 Ok(instance)
403 }
404
405 fn meta_store_ref(&self) -> &SqlMetaStore {
406 self.env.meta_store_ref()
407 }
408
409 async fn load_meta_store_state(&self) -> Result<()> {
411 let now = self.load_now().await?;
412 *self.now.lock().await = now.unwrap_or(0);
413
414 let mut compaction_guard = self.compaction.write().await;
415 let mut versioning_guard = self.versioning.write().await;
416 let mut context_info_guard = self.context_info.write().await;
417 self.load_meta_store_state_impl(
418 &mut compaction_guard,
419 &mut versioning_guard,
420 &mut context_info_guard,
421 )
422 .await
423 }
424
425 async fn load_meta_store_state_impl(
427 &self,
428 compaction_guard: &mut Compaction,
429 versioning_guard: &mut Versioning,
430 context_info: &mut ContextInfo,
431 ) -> Result<()> {
432 use sea_orm::EntityTrait;
433 let meta_store = self.meta_store_ref();
434 let compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus> =
435 compaction_status::Entity::find()
436 .all(&meta_store.conn)
437 .await
438 .map_err(MetadataModelError::from)?
439 .into_iter()
440 .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
441 .collect();
442 if !compaction_statuses.is_empty() {
443 compaction_guard.compaction_statuses = compaction_statuses;
444 }
445
446 compaction_guard.compact_task_assignment = compaction_task::Entity::find()
447 .all(&meta_store.conn)
448 .await
449 .map_err(MetadataModelError::from)?
450 .into_iter()
451 .map(|m| {
452 (
453 m.id as HummockCompactionTaskId,
454 compaction_task_model_to_assignment(m),
455 )
456 })
457 .collect();
458
459 let hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta> =
460 hummock_version_delta::Entity::find()
461 .all(&meta_store.conn)
462 .await
463 .map_err(MetadataModelError::from)?
464 .into_iter()
465 .map(|m| {
466 (
467 m.id,
468 HummockVersionDelta::from_persisted_protobuf_owned(m.into()),
469 )
470 })
471 .collect();
472
473 let checkpoint = self.try_read_checkpoint().await?;
474 let mut redo_state = if let Some(c) = checkpoint {
475 versioning_guard.checkpoint = c;
476 versioning_guard.checkpoint.version.clone()
477 } else {
478 let default_compaction_config = self
479 .compaction_group_manager
480 .read()
481 .await
482 .default_compaction_config();
483 let checkpoint_version = HummockVersion::create_init_version(default_compaction_config);
484 tracing::info!("init hummock version checkpoint");
485 versioning_guard.checkpoint = HummockVersionCheckpoint {
486 version: checkpoint_version.clone(),
487 stale_objects: Default::default(),
488 };
489 self.write_checkpoint(&versioning_guard.checkpoint).await?;
490 checkpoint_version
491 };
492 let mut applied_delta_count = 0;
493 let total_to_apply = hummock_version_deltas.range(redo_state.id + 1..).count();
494 tracing::info!(
495 total_delta = hummock_version_deltas.len(),
496 total_to_apply,
497 "Start redo Hummock version."
498 );
499 for version_delta in hummock_version_deltas
500 .range(redo_state.id + 1..)
501 .map(|(_, v)| v)
502 {
503 assert_eq!(
504 version_delta.prev_id, redo_state.id,
505 "delta prev_id {}, redo state id {}",
506 version_delta.prev_id, redo_state.id
507 );
508 redo_state.apply_version_delta(version_delta);
509 applied_delta_count += 1;
510 if applied_delta_count % 1000 == 0 {
511 tracing::info!("Redo progress {applied_delta_count}/{total_to_apply}.");
512 }
513 }
514 tracing::info!("Finish redo Hummock version.");
515 let pruned_stale_table_id_count = redo_state.prune_stale_table_ids_from_ssts();
516 if pruned_stale_table_id_count > 0 {
517 tracing::warn!(
518 pruned_stale_table_id_count,
519 version_id = ?redo_state.id,
520 "Pruned stale table ids from recovered Hummock SST metadata."
521 );
522 }
523 versioning_guard.version_stats = hummock_version_stats::Entity::find()
524 .one(&meta_store.conn)
525 .await
526 .map_err(MetadataModelError::from)?
527 .map(HummockVersionStats::from)
528 .unwrap_or_else(|| HummockVersionStats {
529 hummock_version_id: 0.into(),
531 ..Default::default()
532 });
533
534 versioning_guard.current_version = redo_state;
535 versioning_guard.hummock_version_deltas = hummock_version_deltas;
536 versioning_guard.table_change_log =
537 risingwave_meta_model::hummock_table_change_log::Entity::find()
538 .all(&self.env.meta_store_ref().conn)
539 .await
540 .map_err(MetadataModelError::from)?
541 .into_iter()
542 .map(|m| (m.table_id, to_table_change_log(m)))
543 .into_group_map()
544 .into_iter()
545 .map(|(table_id, unordered_change_logs)| {
546 (
547 table_id,
548 TableChangeLog::new(
549 unordered_change_logs
550 .into_iter()
551 .sorted_by_key(|l| l.checkpoint_epoch),
552 ),
553 )
554 })
555 .collect();
556
557 context_info.pinned_versions = hummock_pinned_version::Entity::find()
558 .all(&meta_store.conn)
559 .await
560 .map_err(MetadataModelError::from)?
561 .into_iter()
562 .map(|m| (m.context_id as HummockContextId, m.into()))
563 .collect();
564
565 self.initial_compaction_group_config_after_load(
566 versioning_guard,
567 self.compaction_group_manager.write().await.deref_mut(),
568 )
569 .await?;
570
571 Ok(())
572 }
573
574 pub fn init_metadata_for_version_replay(
575 &self,
576 _table_catalogs: Vec<Table>,
577 _compaction_groups: Vec<PbCompactionGroupInfo>,
578 ) -> Result<()> {
579 unimplemented!("kv meta store is deprecated");
580 }
581
582 pub async fn replay_version_delta(
586 &self,
587 mut version_delta: HummockVersionDelta,
588 ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
589 let mut versioning_guard = self.versioning.write().await;
590 version_delta.id = versioning_guard.current_version.next_version_id();
592 version_delta.prev_id = versioning_guard.current_version.id;
593 versioning_guard
594 .current_version
595 .apply_version_delta(&version_delta);
596
597 let version_new = versioning_guard.current_version.clone();
598 let compaction_group_ids = version_delta.group_deltas.keys().cloned().collect_vec();
599 Ok((version_new, compaction_group_ids))
600 }
601
602 pub async fn disable_commit_epoch(&self) -> HummockVersion {
603 let mut versioning_guard = self.versioning.write().await;
604 versioning_guard.disable_commit_epochs = true;
605 versioning_guard.current_version.clone()
606 }
607
608 pub fn metadata_manager(&self) -> &MetadataManager {
609 &self.metadata_manager
610 }
611
612 pub fn object_store_media_type(&self) -> &'static str {
613 self.object_store.media_type()
614 }
615
616 pub fn update_table_id_to_table_option(
617 &self,
618 new_table_id_to_table_option: HashMap<TableId, TableOption>,
619 ) {
620 *self.table_id_to_table_option.write() = new_table_id_to_table_option;
621 }
622
623 pub fn metadata_manager_ref(&self) -> &MetadataManager {
624 &self.metadata_manager
625 }
626
627 pub async fn subscribe_table_committed_epoch(
628 &self,
629 table_id: TableId,
630 ) -> MetaResult<(u64, UnboundedReceiver<u64>)> {
631 let version = self.versioning.read().await;
632 if let Some(epoch) = version.current_version.table_committed_epoch(table_id) {
633 let (tx, rx) = unbounded_channel();
634 self.table_committed_epoch_notifiers
635 .lock()
636 .txs
637 .entry(table_id)
638 .or_default()
639 .push(tx);
640 Ok((epoch, rx))
641 } else {
642 Err(anyhow!("table {} not exist", table_id).into())
643 }
644 }
645}
646
647async fn write_exclusive_cluster_id(
648 state_store_dir: &str,
649 cluster_id: ClusterId,
650 object_store: ObjectStoreRef,
651) -> Result<()> {
652 const CLUSTER_ID_DIR: &str = "cluster_id";
653 const CLUSTER_ID_NAME: &str = "0";
654 let cluster_id_dir = format!("{}/{}/", state_store_dir, CLUSTER_ID_DIR);
655 let cluster_id_full_path = format!("{}{}", cluster_id_dir, CLUSTER_ID_NAME);
656 tracing::info!("try reading cluster_id");
657 match object_store.read(&cluster_id_full_path, ..).await {
658 Ok(stored_cluster_id) => {
659 let stored_cluster_id = String::from_utf8(stored_cluster_id.to_vec()).unwrap();
660 if cluster_id.deref() == stored_cluster_id {
661 return Ok(());
662 }
663
664 Err(ObjectError::internal(format!(
665 "Data directory is already used by another cluster with id {:?}, path {}.",
666 stored_cluster_id, cluster_id_full_path,
667 ))
668 .into())
669 }
670 Err(e) => {
671 if e.is_object_not_found_error() {
672 tracing::info!("cluster_id not found, writing cluster_id");
673 object_store
674 .upload(&cluster_id_full_path, Bytes::from(String::from(cluster_id)))
675 .await?;
676 return Ok(());
677 }
678 Err(e.into())
679 }
680 }
681}