risingwave_meta/hummock/manager/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19
20use anyhow::anyhow;
21use bytes::Bytes;
22use itertools::Itertools;
23use parking_lot::lock_api::RwLock;
24use risingwave_common::catalog::{TableId, TableOption};
25use risingwave_common::monitor::MonitoredRwLock;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
28use risingwave_hummock_sdk::{
29    CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockVersionId,
30    version_archive_dir, version_checkpoint_path,
31};
32use risingwave_meta_model::{
33    compaction_status, compaction_task, hummock_pinned_version, hummock_version_delta,
34    hummock_version_stats,
35};
36use risingwave_pb::hummock::{
37    HummockVersionStats, PbCompactTaskAssignment, PbCompactionGroupInfo,
38    SubscribeCompactionEventRequest,
39};
40use table_write_throughput_statistic::TableWriteThroughputStatisticManager;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
42use tokio::sync::{Mutex, Semaphore};
43use tonic::Streaming;
44
45use crate::MetaResult;
46use crate::hummock::CompactorManagerRef;
47use crate::hummock::compaction::CompactStatus;
48use crate::hummock::error::Result;
49use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
50use crate::hummock::manager::context::ContextInfo;
51use crate::hummock::manager::gc::{FullGcState, GcManager};
52use crate::manager::{MetaSrvEnv, MetadataManager};
53use crate::model::{ClusterId, MetadataModelError};
54use crate::rpc::metrics::MetaMetrics;
55
56mod context;
57mod gc;
58mod tests;
59mod versioning;
60pub use context::HummockVersionSafePoint;
61use versioning::*;
62pub(crate) mod checkpoint;
63mod commit_epoch;
64mod compaction;
65pub mod sequence;
66pub mod table_write_throughput_statistic;
67pub mod time_travel;
68mod timer_task;
69mod transaction;
70mod utils;
71mod worker;
72
73pub use commit_epoch::{CommitEpochInfo, NewTableFragmentInfo};
74pub use compaction::compaction_event_loop::*;
75use compaction::*;
76pub use compaction::{GroupState, GroupStateValidator};
77pub(crate) use utils::*;
78
79struct TableCommittedEpochNotifiers {
80    txs: HashMap<TableId, Vec<UnboundedSender<u64>>>,
81}
82
83impl TableCommittedEpochNotifiers {
84    fn notify_deltas(&mut self, deltas: &[HummockVersionDelta]) {
85        self.txs.retain(|table_id, txs| {
86            let mut is_dropped = false;
87            let mut committed_epoch = None;
88            for delta in deltas {
89                if delta.removed_table_ids.contains(table_id) {
90                    is_dropped = true;
91                    break;
92                }
93                if let Some(info) = delta.state_table_info_delta.get(table_id) {
94                    committed_epoch = Some(info.committed_epoch);
95                }
96            }
97            if is_dropped {
98                false
99            } else if let Some(committed_epoch) = committed_epoch {
100                txs.retain(|tx| tx.send(committed_epoch).is_ok());
101                !txs.is_empty()
102            } else {
103                true
104            }
105        })
106    }
107}
108// Update to states are performed as follow:
109// - Initialize ValTransaction for the meta state to update
110// - Make changes on the ValTransaction.
111// - Call `commit_multi_var` to commit the changes via meta store transaction. If transaction
112//   succeeds, the in-mem state will be updated by the way.
113pub struct HummockManager {
114    pub env: MetaSrvEnv,
115
116    metadata_manager: MetadataManager,
117    /// Lock order: `compaction`, `versioning`, `compaction_group_manager`, `context_info`
118    /// - Lock `compaction` first, then `versioning`, then `compaction_group_manager` and finally `context_info`.
119    /// - This order should be strictly followed to prevent deadlock.
120    compaction: MonitoredRwLock<Compaction>,
121    versioning: MonitoredRwLock<Versioning>,
122    /// `CompactionGroupManager` manages compaction configs for compaction groups.
123    compaction_group_manager: MonitoredRwLock<CompactionGroupManager>,
124    context_info: MonitoredRwLock<ContextInfo>,
125
126    pub metrics: Arc<MetaMetrics>,
127
128    pub compactor_manager: CompactorManagerRef,
129    pub iceberg_compactor_manager: Arc<IcebergCompactorManager>,
130    event_sender: HummockManagerEventSender,
131    object_store: ObjectStoreRef,
132    version_checkpoint_path: String,
133    version_archive_dir: String,
134    pause_version_checkpoint: AtomicBool,
135    table_write_throughput_statistic_manager:
136        parking_lot::RwLock<TableWriteThroughputStatisticManager>,
137    table_committed_epoch_notifiers: parking_lot::Mutex<TableCommittedEpochNotifiers>,
138
139    // for compactor
140    // `compactor_streams_change_tx` is used to pass the mapping from `context_id` to event_stream
141    // and is maintained in memory. All event_streams are consumed through a separate event loop
142    compactor_streams_change_tx:
143        UnboundedSender<(HummockContextId, Streaming<SubscribeCompactionEventRequest>)>,
144
145    // `compaction_state` will record the types of compact tasks that can be triggered in `hummock`
146    // and suggest types with a certain priority.
147    pub compaction_state: CompactionState,
148    full_gc_state: Arc<FullGcState>,
149    now: Mutex<u64>,
150    inflight_time_travel_query: Semaphore,
151    gc_manager: GcManager,
152
153    table_id_to_table_option: parking_lot::RwLock<HashMap<TableId, TableOption>>,
154}
155
156pub type HummockManagerRef = Arc<HummockManager>;
157
158use risingwave_object_store::object::{ObjectError, ObjectStoreRef, build_remote_object_store};
159use risingwave_pb::catalog::Table;
160
161macro_rules! start_measure_real_process_timer {
162    ($hummock_mgr:expr, $func_name:literal) => {
163        $hummock_mgr
164            .metrics
165            .hummock_manager_real_process_time
166            .with_label_values(&[$func_name])
167            .start_timer()
168    };
169}
170pub(crate) use start_measure_real_process_timer;
171
172use super::IcebergCompactorManager;
173use crate::controller::SqlMetaStore;
174use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
175use crate::hummock::manager::worker::HummockManagerEventSender;
176
177impl HummockManager {
178    pub async fn new(
179        env: MetaSrvEnv,
180        metadata_manager: MetadataManager,
181        metrics: Arc<MetaMetrics>,
182        compactor_manager: CompactorManagerRef,
183        compactor_streams_change_tx: UnboundedSender<(
184            HummockContextId,
185            Streaming<SubscribeCompactionEventRequest>,
186        )>,
187    ) -> Result<HummockManagerRef> {
188        let compaction_group_manager = CompactionGroupManager::new(&env).await?;
189        Self::new_impl(
190            env,
191            metadata_manager,
192            metrics,
193            compactor_manager,
194            compaction_group_manager,
195            compactor_streams_change_tx,
196        )
197        .await
198    }
199
200    #[cfg(any(test, feature = "test"))]
201    pub(super) async fn with_config(
202        env: MetaSrvEnv,
203        cluster_controller: crate::controller::cluster::ClusterControllerRef,
204        catalog_controller: crate::controller::catalog::CatalogControllerRef,
205        metrics: Arc<MetaMetrics>,
206        compactor_manager: CompactorManagerRef,
207        config: risingwave_pb::hummock::CompactionConfig,
208        compactor_streams_change_tx: UnboundedSender<(
209            HummockContextId,
210            Streaming<SubscribeCompactionEventRequest>,
211        )>,
212    ) -> HummockManagerRef {
213        let compaction_group_manager = CompactionGroupManager::new_with_config(&env, config)
214            .await
215            .unwrap();
216        let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
217        Self::new_impl(
218            env,
219            metadata_manager,
220            metrics,
221            compactor_manager,
222            compaction_group_manager,
223            compactor_streams_change_tx,
224        )
225        .await
226        .unwrap()
227    }
228
229    async fn new_impl(
230        env: MetaSrvEnv,
231        metadata_manager: MetadataManager,
232        metrics: Arc<MetaMetrics>,
233        compactor_manager: CompactorManagerRef,
234        compaction_group_manager: CompactionGroupManager,
235        compactor_streams_change_tx: UnboundedSender<(
236            HummockContextId,
237            Streaming<SubscribeCompactionEventRequest>,
238        )>,
239    ) -> Result<HummockManagerRef> {
240        let sys_params = env.system_params_reader().await;
241        let state_store_url = sys_params.state_store();
242
243        let state_store_dir: &str = sys_params.data_directory();
244        let use_new_object_prefix_strategy: bool = sys_params.use_new_object_prefix_strategy();
245        let deterministic_mode = env.opts.compaction_deterministic_test;
246        let mut object_store_config = env.opts.object_store_config.clone();
247        // For fs and hdfs object store, operations are not always atomic.
248        // We should manually enable atomicity guarantee by setting the atomic_write_dir config when building services.
249        object_store_config.set_atomic_write_dir();
250        let object_store = Arc::new(
251            build_remote_object_store(
252                state_store_url.strip_prefix("hummock+").unwrap_or("memory"),
253                metrics.object_store_metric.clone(),
254                "Version Checkpoint",
255                Arc::new(object_store_config),
256            )
257            .await,
258        );
259        // Make sure data dir is not used by another cluster.
260        // Skip this check in e2e compaction test, which needs to start a secondary cluster with
261        // same bucket
262        if !deterministic_mode {
263            write_exclusive_cluster_id(
264                state_store_dir,
265                env.cluster_id().clone(),
266                object_store.clone(),
267            )
268            .await?;
269
270            // config bucket lifecycle for new cluster.
271            if let risingwave_object_store::object::ObjectStoreImpl::S3(s3) = object_store.as_ref()
272                && !env.opts.do_not_config_object_storage_lifecycle
273            {
274                let is_bucket_expiration_configured =
275                    s3.inner().configure_bucket_lifecycle(state_store_dir).await;
276                if is_bucket_expiration_configured {
277                    return Err(ObjectError::internal("Cluster cannot start with object expiration configured for bucket because RisingWave data will be lost when object expiration kicks in.
278                    Please disable object expiration and restart the cluster.")
279                    .into());
280                }
281            }
282        }
283        let version_checkpoint_path = version_checkpoint_path(state_store_dir);
284        let version_archive_dir = version_archive_dir(state_store_dir);
285        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
286        let inflight_time_travel_query = env.opts.max_inflight_time_travel_query;
287        let gc_manager = GcManager::new(
288            object_store.clone(),
289            state_store_dir,
290            use_new_object_prefix_strategy,
291        );
292
293        let max_table_statistic_expired_time = std::cmp::max(
294            env.opts.table_stat_throuput_window_seconds_for_split,
295            env.opts.table_stat_throuput_window_seconds_for_merge,
296        ) as i64;
297
298        let iceberg_compactor_manager = Arc::new(IcebergCompactorManager::new());
299
300        let instance = HummockManager {
301            env,
302            versioning: MonitoredRwLock::new(
303                metrics.hummock_manager_lock_time.clone(),
304                Default::default(),
305                "hummock_manager::versioning",
306            ),
307            compaction: MonitoredRwLock::new(
308                metrics.hummock_manager_lock_time.clone(),
309                Default::default(),
310                "hummock_manager::compaction",
311            ),
312            compaction_group_manager: MonitoredRwLock::new(
313                metrics.hummock_manager_lock_time.clone(),
314                compaction_group_manager,
315                "hummock_manager::compaction_group_manager",
316            ),
317            context_info: MonitoredRwLock::new(
318                metrics.hummock_manager_lock_time.clone(),
319                Default::default(),
320                "hummock_manager::context_info",
321            ),
322            metrics,
323            metadata_manager,
324            compactor_manager,
325            iceberg_compactor_manager,
326            event_sender: tx,
327            object_store,
328            version_checkpoint_path,
329            version_archive_dir,
330            pause_version_checkpoint: AtomicBool::new(false),
331            table_write_throughput_statistic_manager: parking_lot::RwLock::new(
332                TableWriteThroughputStatisticManager::new(max_table_statistic_expired_time),
333            ),
334            table_committed_epoch_notifiers: parking_lot::Mutex::new(
335                TableCommittedEpochNotifiers {
336                    txs: Default::default(),
337                },
338            ),
339            compactor_streams_change_tx,
340            compaction_state: CompactionState::new(),
341            full_gc_state: FullGcState::new().into(),
342            now: Mutex::new(0),
343            inflight_time_travel_query: Semaphore::new(inflight_time_travel_query as usize),
344            gc_manager,
345            table_id_to_table_option: RwLock::new(HashMap::new()),
346        };
347        let instance = Arc::new(instance);
348        instance.init_time_travel_state().await?;
349        instance.start_worker(rx);
350        instance.load_meta_store_state().await?;
351        instance.release_invalid_contexts().await?;
352        // Release snapshots pinned by meta on restarting.
353        instance.release_meta_context().await?;
354        Ok(instance)
355    }
356
357    fn meta_store_ref(&self) -> &SqlMetaStore {
358        self.env.meta_store_ref()
359    }
360
361    /// Load state from meta store.
362    async fn load_meta_store_state(&self) -> Result<()> {
363        let now = self.load_now().await?;
364        *self.now.lock().await = now.unwrap_or(0);
365
366        let mut compaction_guard = self.compaction.write().await;
367        let mut versioning_guard = self.versioning.write().await;
368        let mut context_info_guard = self.context_info.write().await;
369        self.load_meta_store_state_impl(
370            &mut compaction_guard,
371            &mut versioning_guard,
372            &mut context_info_guard,
373        )
374        .await
375    }
376
377    /// Load state from meta store.
378    async fn load_meta_store_state_impl(
379        &self,
380        compaction_guard: &mut Compaction,
381        versioning_guard: &mut Versioning,
382        context_info: &mut ContextInfo,
383    ) -> Result<()> {
384        use sea_orm::EntityTrait;
385        let meta_store = self.meta_store_ref();
386        let compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus> =
387            compaction_status::Entity::find()
388                .all(&meta_store.conn)
389                .await
390                .map_err(MetadataModelError::from)?
391                .into_iter()
392                .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
393                .collect();
394        if !compaction_statuses.is_empty() {
395            compaction_guard.compaction_statuses = compaction_statuses;
396        }
397
398        compaction_guard.compact_task_assignment = compaction_task::Entity::find()
399            .all(&meta_store.conn)
400            .await
401            .map_err(MetadataModelError::from)?
402            .into_iter()
403            .map(|m| {
404                (
405                    m.id as HummockCompactionTaskId,
406                    PbCompactTaskAssignment::from(m),
407                )
408            })
409            .collect();
410
411        let hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta> =
412            hummock_version_delta::Entity::find()
413                .all(&meta_store.conn)
414                .await
415                .map_err(MetadataModelError::from)?
416                .into_iter()
417                .map(|m| {
418                    (
419                        HummockVersionId::new(m.id as _),
420                        HummockVersionDelta::from_persisted_protobuf(&m.into()),
421                    )
422                })
423                .collect();
424
425        let checkpoint = self.try_read_checkpoint().await?;
426        let mut redo_state = if let Some(c) = checkpoint {
427            versioning_guard.checkpoint = c;
428            versioning_guard.checkpoint.version.clone()
429        } else {
430            let default_compaction_config = self
431                .compaction_group_manager
432                .read()
433                .await
434                .default_compaction_config();
435            let checkpoint_version = HummockVersion::create_init_version(default_compaction_config);
436            tracing::info!("init hummock version checkpoint");
437            versioning_guard.checkpoint = HummockVersionCheckpoint {
438                version: checkpoint_version.clone(),
439                stale_objects: Default::default(),
440            };
441            self.write_checkpoint(&versioning_guard.checkpoint).await?;
442            checkpoint_version
443        };
444        let mut applied_delta_count = 0;
445        let total_to_apply = hummock_version_deltas.range(redo_state.id + 1..).count();
446        tracing::info!(
447            total_delta = hummock_version_deltas.len(),
448            total_to_apply,
449            "Start redo Hummock version."
450        );
451        for version_delta in hummock_version_deltas
452            .range(redo_state.id + 1..)
453            .map(|(_, v)| v)
454        {
455            assert_eq!(
456                version_delta.prev_id, redo_state.id,
457                "delta prev_id {}, redo state id {}",
458                version_delta.prev_id, redo_state.id
459            );
460            redo_state.apply_version_delta(version_delta);
461            applied_delta_count += 1;
462            if applied_delta_count % 1000 == 0 {
463                tracing::info!("Redo progress {applied_delta_count}/{total_to_apply}.");
464            }
465        }
466        tracing::info!("Finish redo Hummock version.");
467        versioning_guard.version_stats = hummock_version_stats::Entity::find()
468            .one(&meta_store.conn)
469            .await
470            .map_err(MetadataModelError::from)?
471            .map(HummockVersionStats::from)
472            .unwrap_or_else(|| HummockVersionStats {
473                // version_stats.hummock_version_id is always 0 in meta store.
474                hummock_version_id: 0,
475                ..Default::default()
476            });
477
478        versioning_guard.current_version = redo_state;
479        versioning_guard.hummock_version_deltas = hummock_version_deltas;
480
481        context_info.pinned_versions = hummock_pinned_version::Entity::find()
482            .all(&meta_store.conn)
483            .await
484            .map_err(MetadataModelError::from)?
485            .into_iter()
486            .map(|m| (m.context_id as HummockContextId, m.into()))
487            .collect();
488
489        self.initial_compaction_group_config_after_load(
490            versioning_guard,
491            self.compaction_group_manager.write().await.deref_mut(),
492        )
493        .await?;
494
495        Ok(())
496    }
497
498    pub fn init_metadata_for_version_replay(
499        &self,
500        _table_catalogs: Vec<Table>,
501        _compaction_groups: Vec<PbCompactionGroupInfo>,
502    ) -> Result<()> {
503        unimplemented!("kv meta store is deprecated");
504    }
505
506    /// Replay a version delta to current hummock version.
507    /// Returns the `version_id`, `max_committed_epoch` of the new version and the modified
508    /// compaction groups
509    pub async fn replay_version_delta(
510        &self,
511        mut version_delta: HummockVersionDelta,
512    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
513        let mut versioning_guard = self.versioning.write().await;
514        // ensure the version id is ascending after replay
515        version_delta.id = versioning_guard.current_version.next_version_id();
516        version_delta.prev_id = versioning_guard.current_version.id;
517        versioning_guard
518            .current_version
519            .apply_version_delta(&version_delta);
520
521        let version_new = versioning_guard.current_version.clone();
522        let compaction_group_ids = version_delta.group_deltas.keys().cloned().collect_vec();
523        Ok((version_new, compaction_group_ids))
524    }
525
526    pub async fn disable_commit_epoch(&self) -> HummockVersion {
527        let mut versioning_guard = self.versioning.write().await;
528        versioning_guard.disable_commit_epochs = true;
529        versioning_guard.current_version.clone()
530    }
531
532    pub fn metadata_manager(&self) -> &MetadataManager {
533        &self.metadata_manager
534    }
535
536    pub fn object_store_media_type(&self) -> &'static str {
537        self.object_store.media_type()
538    }
539
540    pub fn update_table_id_to_table_option(
541        &self,
542        new_table_id_to_table_option: HashMap<TableId, TableOption>,
543    ) {
544        *self.table_id_to_table_option.write() = new_table_id_to_table_option;
545    }
546
547    pub fn metadata_manager_ref(&self) -> &MetadataManager {
548        &self.metadata_manager
549    }
550
551    pub async fn subscribe_table_committed_epoch(
552        &self,
553        table_id: TableId,
554    ) -> MetaResult<(u64, UnboundedReceiver<u64>)> {
555        let version = self.versioning.read().await;
556        if let Some(epoch) = version.current_version.table_committed_epoch(table_id) {
557            let (tx, rx) = unbounded_channel();
558            self.table_committed_epoch_notifiers
559                .lock()
560                .txs
561                .entry(table_id)
562                .or_default()
563                .push(tx);
564            Ok((epoch, rx))
565        } else {
566            Err(anyhow!("table {} not exist", table_id).into())
567        }
568    }
569}
570
571async fn write_exclusive_cluster_id(
572    state_store_dir: &str,
573    cluster_id: ClusterId,
574    object_store: ObjectStoreRef,
575) -> Result<()> {
576    const CLUSTER_ID_DIR: &str = "cluster_id";
577    const CLUSTER_ID_NAME: &str = "0";
578    let cluster_id_dir = format!("{}/{}/", state_store_dir, CLUSTER_ID_DIR);
579    let cluster_id_full_path = format!("{}{}", cluster_id_dir, CLUSTER_ID_NAME);
580    tracing::info!("try reading cluster_id");
581    match object_store.read(&cluster_id_full_path, ..).await {
582        Ok(stored_cluster_id) => {
583            let stored_cluster_id = String::from_utf8(stored_cluster_id.to_vec()).unwrap();
584            if cluster_id.deref() == stored_cluster_id {
585                return Ok(());
586            }
587
588            Err(ObjectError::internal(format!(
589                "Data directory is already used by another cluster with id {:?}, path {}.",
590                stored_cluster_id, cluster_id_full_path,
591            ))
592            .into())
593        }
594        Err(e) => {
595            if e.is_object_not_found_error() {
596                tracing::info!("cluster_id not found, writing cluster_id");
597                object_store
598                    .upload(&cluster_id_full_path, Bytes::from(String::from(cluster_id)))
599                    .await?;
600                return Ok(());
601            }
602            Err(e.into())
603        }
604    }
605}