risingwave_meta/hummock/manager/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19
20use anyhow::anyhow;
21use bytes::Bytes;
22use itertools::Itertools;
23use parking_lot::lock_api::RwLock;
24use risingwave_common::catalog::{TableId, TableOption};
25use risingwave_common::monitor::MonitoredRwLock;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
28use risingwave_hummock_sdk::{
29    CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockVersionId,
30    version_archive_dir, version_checkpoint_path,
31};
32use risingwave_meta_model::{
33    compaction_status, compaction_task, hummock_pinned_version, hummock_version_delta,
34    hummock_version_stats,
35};
36use risingwave_pb::hummock::{
37    HummockVersionStats, PbCompactTaskAssignment, PbCompactionGroupInfo,
38    SubscribeCompactionEventRequest,
39};
40use table_write_throughput_statistic::TableWriteThroughputStatisticManager;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
42use tokio::sync::{Mutex, Semaphore};
43use tonic::Streaming;
44
45use crate::MetaResult;
46use crate::hummock::CompactorManagerRef;
47use crate::hummock::compaction::CompactStatus;
48use crate::hummock::error::Result;
49use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
50use crate::hummock::manager::context::ContextInfo;
51use crate::hummock::manager::gc::{FullGcState, GcManager};
52use crate::manager::{MetaSrvEnv, MetadataManager};
53use crate::model::{ClusterId, MetadataModelError};
54use crate::rpc::metrics::MetaMetrics;
55
56mod context;
57mod gc;
58mod tests;
59mod versioning;
60pub use context::HummockVersionSafePoint;
61use versioning::*;
62pub(crate) mod checkpoint;
63mod commit_epoch;
64mod compaction;
65pub mod sequence;
66pub mod table_write_throughput_statistic;
67pub mod time_travel;
68mod timer_task;
69mod transaction;
70mod utils;
71mod worker;
72
73pub use commit_epoch::{CommitEpochInfo, NewTableFragmentInfo};
74use compaction::*;
75pub use compaction::{GroupState, GroupStateValidator};
76pub(crate) use utils::*;
77
78struct TableCommittedEpochNotifiers {
79    txs: HashMap<TableId, Vec<UnboundedSender<u64>>>,
80}
81
82impl TableCommittedEpochNotifiers {
83    fn notify_deltas(&mut self, deltas: &[HummockVersionDelta]) {
84        self.txs.retain(|table_id, txs| {
85            let mut is_dropped = false;
86            let mut committed_epoch = None;
87            for delta in deltas {
88                if delta.removed_table_ids.contains(table_id) {
89                    is_dropped = true;
90                    break;
91                }
92                if let Some(info) = delta.state_table_info_delta.get(table_id) {
93                    committed_epoch = Some(info.committed_epoch);
94                }
95            }
96            if is_dropped {
97                false
98            } else if let Some(committed_epoch) = committed_epoch {
99                txs.retain(|tx| tx.send(committed_epoch).is_ok());
100                !txs.is_empty()
101            } else {
102                true
103            }
104        })
105    }
106}
107// Update to states are performed as follow:
108// - Initialize ValTransaction for the meta state to update
109// - Make changes on the ValTransaction.
110// - Call `commit_multi_var` to commit the changes via meta store transaction. If transaction
111//   succeeds, the in-mem state will be updated by the way.
112pub struct HummockManager {
113    pub env: MetaSrvEnv,
114
115    metadata_manager: MetadataManager,
116    /// Lock order: `compaction`, `versioning`, `compaction_group_manager`, `context_info`
117    /// - Lock `compaction` first, then `versioning`, then `compaction_group_manager` and finally `context_info`.
118    /// - This order should be strictly followed to prevent deadlock.
119    compaction: MonitoredRwLock<Compaction>,
120    versioning: MonitoredRwLock<Versioning>,
121    /// `CompactionGroupManager` manages compaction configs for compaction groups.
122    compaction_group_manager: MonitoredRwLock<CompactionGroupManager>,
123    context_info: MonitoredRwLock<ContextInfo>,
124
125    pub metrics: Arc<MetaMetrics>,
126
127    pub compactor_manager: CompactorManagerRef,
128    event_sender: HummockManagerEventSender,
129    object_store: ObjectStoreRef,
130    version_checkpoint_path: String,
131    version_archive_dir: String,
132    pause_version_checkpoint: AtomicBool,
133    table_write_throughput_statistic_manager:
134        parking_lot::RwLock<TableWriteThroughputStatisticManager>,
135    table_committed_epoch_notifiers: parking_lot::Mutex<TableCommittedEpochNotifiers>,
136
137    // for compactor
138    // `compactor_streams_change_tx` is used to pass the mapping from `context_id` to event_stream
139    // and is maintained in memory. All event_streams are consumed through a separate event loop
140    compactor_streams_change_tx: UnboundedSender<(u32, Streaming<SubscribeCompactionEventRequest>)>,
141
142    // `compaction_state` will record the types of compact tasks that can be triggered in `hummock`
143    // and suggest types with a certain priority.
144    pub compaction_state: CompactionState,
145    full_gc_state: Arc<FullGcState>,
146    now: Mutex<u64>,
147    inflight_time_travel_query: Semaphore,
148    gc_manager: GcManager,
149
150    table_id_to_table_option: parking_lot::RwLock<HashMap<u32, TableOption>>,
151}
152
153pub type HummockManagerRef = Arc<HummockManager>;
154
155use risingwave_object_store::object::{ObjectError, ObjectStoreRef, build_remote_object_store};
156use risingwave_pb::catalog::Table;
157
158macro_rules! start_measure_real_process_timer {
159    ($hummock_mgr:expr, $func_name:literal) => {
160        $hummock_mgr
161            .metrics
162            .hummock_manager_real_process_time
163            .with_label_values(&[$func_name])
164            .start_timer()
165    };
166}
167pub(crate) use start_measure_real_process_timer;
168
169use crate::controller::SqlMetaStore;
170use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
171use crate::hummock::manager::worker::HummockManagerEventSender;
172
173impl HummockManager {
174    pub async fn new(
175        env: MetaSrvEnv,
176        metadata_manager: MetadataManager,
177        metrics: Arc<MetaMetrics>,
178        compactor_manager: CompactorManagerRef,
179        compactor_streams_change_tx: UnboundedSender<(
180            u32,
181            Streaming<SubscribeCompactionEventRequest>,
182        )>,
183    ) -> Result<HummockManagerRef> {
184        let compaction_group_manager = CompactionGroupManager::new(&env).await?;
185        Self::new_impl(
186            env,
187            metadata_manager,
188            metrics,
189            compactor_manager,
190            compaction_group_manager,
191            compactor_streams_change_tx,
192        )
193        .await
194    }
195
196    #[cfg(any(test, feature = "test"))]
197    pub(super) async fn with_config(
198        env: MetaSrvEnv,
199        cluster_controller: crate::controller::cluster::ClusterControllerRef,
200        catalog_controller: crate::controller::catalog::CatalogControllerRef,
201        metrics: Arc<MetaMetrics>,
202        compactor_manager: CompactorManagerRef,
203        config: risingwave_pb::hummock::CompactionConfig,
204        compactor_streams_change_tx: UnboundedSender<(
205            u32,
206            Streaming<SubscribeCompactionEventRequest>,
207        )>,
208    ) -> HummockManagerRef {
209        let compaction_group_manager = CompactionGroupManager::new_with_config(&env, config)
210            .await
211            .unwrap();
212        let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
213        Self::new_impl(
214            env,
215            metadata_manager,
216            metrics,
217            compactor_manager,
218            compaction_group_manager,
219            compactor_streams_change_tx,
220        )
221        .await
222        .unwrap()
223    }
224
225    async fn new_impl(
226        env: MetaSrvEnv,
227        metadata_manager: MetadataManager,
228        metrics: Arc<MetaMetrics>,
229        compactor_manager: CompactorManagerRef,
230        compaction_group_manager: CompactionGroupManager,
231        compactor_streams_change_tx: UnboundedSender<(
232            u32,
233            Streaming<SubscribeCompactionEventRequest>,
234        )>,
235    ) -> Result<HummockManagerRef> {
236        let sys_params = env.system_params_reader().await;
237        let state_store_url = sys_params.state_store();
238        let state_store_dir: &str = sys_params.data_directory();
239        let use_new_object_prefix_strategy: bool = sys_params.use_new_object_prefix_strategy();
240        let deterministic_mode = env.opts.compaction_deterministic_test;
241        let mut object_store_config = env.opts.object_store_config.clone();
242        // For fs and hdfs object store, operations are not always atomic.
243        // We should manually enable atomicity guarantee by setting the atomic_write_dir config when building services.
244        object_store_config.set_atomic_write_dir();
245        let object_store = Arc::new(
246            build_remote_object_store(
247                state_store_url.strip_prefix("hummock+").unwrap_or("memory"),
248                metrics.object_store_metric.clone(),
249                "Version Checkpoint",
250                Arc::new(object_store_config),
251            )
252            .await,
253        );
254        // Make sure data dir is not used by another cluster.
255        // Skip this check in e2e compaction test, which needs to start a secondary cluster with
256        // same bucket
257        if !deterministic_mode {
258            write_exclusive_cluster_id(
259                state_store_dir,
260                env.cluster_id().clone(),
261                object_store.clone(),
262            )
263            .await?;
264
265            // config bucket lifecycle for new cluster.
266            if let risingwave_object_store::object::ObjectStoreImpl::S3(s3) = object_store.as_ref()
267                && !env.opts.do_not_config_object_storage_lifecycle
268            {
269                let is_bucket_expiration_configured =
270                    s3.inner().configure_bucket_lifecycle(state_store_dir).await;
271                if is_bucket_expiration_configured {
272                    return Err(ObjectError::internal("Cluster cannot start with object expiration configured for bucket because RisingWave data will be lost when object expiration kicks in.
273                    Please disable object expiration and restart the cluster.")
274                    .into());
275                }
276            }
277        }
278        let version_checkpoint_path = version_checkpoint_path(state_store_dir);
279        let version_archive_dir = version_archive_dir(state_store_dir);
280        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
281        let inflight_time_travel_query = env.opts.max_inflight_time_travel_query;
282        let gc_manager = GcManager::new(
283            object_store.clone(),
284            state_store_dir,
285            use_new_object_prefix_strategy,
286        );
287
288        let max_table_statistic_expired_time = std::cmp::max(
289            env.opts.table_stat_throuput_window_seconds_for_split,
290            env.opts.table_stat_throuput_window_seconds_for_merge,
291        ) as i64;
292
293        let instance = HummockManager {
294            env,
295            versioning: MonitoredRwLock::new(
296                metrics.hummock_manager_lock_time.clone(),
297                Default::default(),
298                "hummock_manager::versioning",
299            ),
300            compaction: MonitoredRwLock::new(
301                metrics.hummock_manager_lock_time.clone(),
302                Default::default(),
303                "hummock_manager::compaction",
304            ),
305            compaction_group_manager: MonitoredRwLock::new(
306                metrics.hummock_manager_lock_time.clone(),
307                compaction_group_manager,
308                "hummock_manager::compaction_group_manager",
309            ),
310            context_info: MonitoredRwLock::new(
311                metrics.hummock_manager_lock_time.clone(),
312                Default::default(),
313                "hummock_manager::context_info",
314            ),
315            metrics,
316            metadata_manager,
317            // compaction_request_channel: parking_lot::RwLock::new(None),
318            compactor_manager,
319            event_sender: tx,
320            object_store,
321            version_checkpoint_path,
322            version_archive_dir,
323            pause_version_checkpoint: AtomicBool::new(false),
324            table_write_throughput_statistic_manager: parking_lot::RwLock::new(
325                TableWriteThroughputStatisticManager::new(max_table_statistic_expired_time),
326            ),
327            table_committed_epoch_notifiers: parking_lot::Mutex::new(
328                TableCommittedEpochNotifiers {
329                    txs: Default::default(),
330                },
331            ),
332            compactor_streams_change_tx,
333            compaction_state: CompactionState::new(),
334            full_gc_state: FullGcState::new().into(),
335            now: Mutex::new(0),
336            inflight_time_travel_query: Semaphore::new(inflight_time_travel_query as usize),
337            gc_manager,
338            table_id_to_table_option: RwLock::new(HashMap::new()),
339        };
340        let instance = Arc::new(instance);
341        instance.init_time_travel_state().await?;
342        instance.start_worker(rx).await;
343        instance.load_meta_store_state().await?;
344        instance.release_invalid_contexts().await?;
345        // Release snapshots pinned by meta on restarting.
346        instance.release_meta_context().await?;
347        Ok(instance)
348    }
349
350    fn meta_store_ref(&self) -> &SqlMetaStore {
351        self.env.meta_store_ref()
352    }
353
354    /// Load state from meta store.
355    async fn load_meta_store_state(&self) -> Result<()> {
356        let now = self.load_now().await?;
357        *self.now.lock().await = now.unwrap_or(0);
358
359        let mut compaction_guard = self.compaction.write().await;
360        let mut versioning_guard = self.versioning.write().await;
361        let mut context_info_guard = self.context_info.write().await;
362        self.load_meta_store_state_impl(
363            &mut compaction_guard,
364            &mut versioning_guard,
365            &mut context_info_guard,
366        )
367        .await
368    }
369
370    /// Load state from meta store.
371    async fn load_meta_store_state_impl(
372        &self,
373        compaction_guard: &mut Compaction,
374        versioning_guard: &mut Versioning,
375        context_info: &mut ContextInfo,
376    ) -> Result<()> {
377        use sea_orm::EntityTrait;
378        let meta_store = self.meta_store_ref();
379        let compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus> =
380            compaction_status::Entity::find()
381                .all(&meta_store.conn)
382                .await
383                .map_err(MetadataModelError::from)?
384                .into_iter()
385                .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
386                .collect();
387        if !compaction_statuses.is_empty() {
388            compaction_guard.compaction_statuses = compaction_statuses;
389        }
390
391        compaction_guard.compact_task_assignment = compaction_task::Entity::find()
392            .all(&meta_store.conn)
393            .await
394            .map_err(MetadataModelError::from)?
395            .into_iter()
396            .map(|m| {
397                (
398                    m.id as HummockCompactionTaskId,
399                    PbCompactTaskAssignment::from(m),
400                )
401            })
402            .collect();
403
404        let hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta> =
405            hummock_version_delta::Entity::find()
406                .all(&meta_store.conn)
407                .await
408                .map_err(MetadataModelError::from)?
409                .into_iter()
410                .map(|m| {
411                    (
412                        HummockVersionId::new(m.id as _),
413                        HummockVersionDelta::from_persisted_protobuf(&m.into()),
414                    )
415                })
416                .collect();
417
418        let checkpoint = self.try_read_checkpoint().await?;
419        let mut redo_state = if let Some(c) = checkpoint {
420            versioning_guard.checkpoint = c;
421            versioning_guard.checkpoint.version.clone()
422        } else {
423            let default_compaction_config = self
424                .compaction_group_manager
425                .read()
426                .await
427                .default_compaction_config();
428            let checkpoint_version = HummockVersion::create_init_version(default_compaction_config);
429            tracing::info!("init hummock version checkpoint");
430            versioning_guard.checkpoint = HummockVersionCheckpoint {
431                version: checkpoint_version.clone(),
432                stale_objects: Default::default(),
433            };
434            self.write_checkpoint(&versioning_guard.checkpoint).await?;
435            checkpoint_version
436        };
437        let mut applied_delta_count = 0;
438        let total_to_apply = hummock_version_deltas.range(redo_state.id + 1..).count();
439        tracing::info!(
440            total_delta = hummock_version_deltas.len(),
441            total_to_apply,
442            "Start redo Hummock version."
443        );
444        for version_delta in hummock_version_deltas
445            .range(redo_state.id + 1..)
446            .map(|(_, v)| v)
447        {
448            assert_eq!(
449                version_delta.prev_id, redo_state.id,
450                "delta prev_id {}, redo state id {}",
451                version_delta.prev_id, redo_state.id
452            );
453            redo_state.apply_version_delta(version_delta);
454            applied_delta_count += 1;
455            if applied_delta_count % 1000 == 0 {
456                tracing::info!("Redo progress {applied_delta_count}/{total_to_apply}.");
457            }
458        }
459        tracing::info!("Finish redo Hummock version.");
460        versioning_guard.version_stats = hummock_version_stats::Entity::find()
461            .one(&meta_store.conn)
462            .await
463            .map_err(MetadataModelError::from)?
464            .map(HummockVersionStats::from)
465            .unwrap_or_else(|| HummockVersionStats {
466                // version_stats.hummock_version_id is always 0 in meta store.
467                hummock_version_id: 0,
468                ..Default::default()
469            });
470
471        versioning_guard.current_version = redo_state;
472        versioning_guard.hummock_version_deltas = hummock_version_deltas;
473
474        context_info.pinned_versions = hummock_pinned_version::Entity::find()
475            .all(&meta_store.conn)
476            .await
477            .map_err(MetadataModelError::from)?
478            .into_iter()
479            .map(|m| (m.context_id as HummockContextId, m.into()))
480            .collect();
481
482        self.initial_compaction_group_config_after_load(
483            versioning_guard,
484            self.compaction_group_manager.write().await.deref_mut(),
485        )
486        .await?;
487
488        Ok(())
489    }
490
491    pub fn init_metadata_for_version_replay(
492        &self,
493        _table_catalogs: Vec<Table>,
494        _compaction_groups: Vec<PbCompactionGroupInfo>,
495    ) -> Result<()> {
496        unimplemented!("kv meta store is deprecated");
497    }
498
499    /// Replay a version delta to current hummock version.
500    /// Returns the `version_id`, `max_committed_epoch` of the new version and the modified
501    /// compaction groups
502    pub async fn replay_version_delta(
503        &self,
504        mut version_delta: HummockVersionDelta,
505    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
506        let mut versioning_guard = self.versioning.write().await;
507        // ensure the version id is ascending after replay
508        version_delta.id = versioning_guard.current_version.next_version_id();
509        version_delta.prev_id = versioning_guard.current_version.id;
510        versioning_guard
511            .current_version
512            .apply_version_delta(&version_delta);
513
514        let version_new = versioning_guard.current_version.clone();
515        let compaction_group_ids = version_delta.group_deltas.keys().cloned().collect_vec();
516        Ok((version_new, compaction_group_ids))
517    }
518
519    pub async fn disable_commit_epoch(&self) -> HummockVersion {
520        let mut versioning_guard = self.versioning.write().await;
521        versioning_guard.disable_commit_epochs = true;
522        versioning_guard.current_version.clone()
523    }
524
525    pub fn metadata_manager(&self) -> &MetadataManager {
526        &self.metadata_manager
527    }
528
529    pub fn object_store_media_type(&self) -> &'static str {
530        self.object_store.media_type()
531    }
532
533    pub fn update_table_id_to_table_option(
534        &self,
535        new_table_id_to_table_option: HashMap<u32, TableOption>,
536    ) {
537        *self.table_id_to_table_option.write() = new_table_id_to_table_option;
538    }
539
540    pub fn metadata_manager_ref(&self) -> &MetadataManager {
541        &self.metadata_manager
542    }
543
544    pub async fn subscribe_table_committed_epoch(
545        &self,
546        table_id: TableId,
547    ) -> MetaResult<(u64, UnboundedReceiver<u64>)> {
548        let version = self.versioning.read().await;
549        if let Some(epoch) = version.current_version.table_committed_epoch(table_id) {
550            let (tx, rx) = unbounded_channel();
551            self.table_committed_epoch_notifiers
552                .lock()
553                .txs
554                .entry(table_id)
555                .or_default()
556                .push(tx);
557            Ok((epoch, rx))
558        } else {
559            Err(anyhow!("table {} not exist", table_id).into())
560        }
561    }
562}
563
564async fn write_exclusive_cluster_id(
565    state_store_dir: &str,
566    cluster_id: ClusterId,
567    object_store: ObjectStoreRef,
568) -> Result<()> {
569    const CLUSTER_ID_DIR: &str = "cluster_id";
570    const CLUSTER_ID_NAME: &str = "0";
571    let cluster_id_dir = format!("{}/{}/", state_store_dir, CLUSTER_ID_DIR);
572    let cluster_id_full_path = format!("{}{}", cluster_id_dir, CLUSTER_ID_NAME);
573    tracing::info!("try reading cluster_id");
574    match object_store.read(&cluster_id_full_path, ..).await {
575        Ok(stored_cluster_id) => {
576            let stored_cluster_id = String::from_utf8(stored_cluster_id.to_vec()).unwrap();
577            if cluster_id.deref() == stored_cluster_id {
578                return Ok(());
579            }
580
581            Err(ObjectError::internal(format!(
582                "Data directory is already used by another cluster with id {:?}, path {}.",
583                stored_cluster_id, cluster_id_full_path,
584            ))
585            .into())
586        }
587        Err(e) => {
588            if e.is_object_not_found_error() {
589                tracing::info!("cluster_id not found, writing cluster_id");
590                object_store
591                    .upload(&cluster_id_full_path, Bytes::from(String::from(cluster_id)))
592                    .await?;
593                return Ok(());
594            }
595            Err(e.into())
596        }
597    }
598}