risingwave_meta/hummock/manager/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19
20use anyhow::anyhow;
21use bytes::Bytes;
22use itertools::Itertools;
23use parking_lot::lock_api::RwLock;
24use risingwave_common::catalog::{TableId, TableOption};
25use risingwave_common::monitor::MonitoredRwLock;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
28use risingwave_hummock_sdk::{
29    CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockVersionId,
30    version_archive_dir, version_checkpoint_path,
31};
32use risingwave_meta_model::{
33    compaction_status, compaction_task, hummock_pinned_version, hummock_version_delta,
34    hummock_version_stats,
35};
36use risingwave_pb::hummock::{
37    HummockVersionStats, PbCompactTaskAssignment, PbCompactionGroupInfo,
38    SubscribeCompactionEventRequest,
39};
40use table_write_throughput_statistic::TableWriteThroughputStatisticManager;
41use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
42use tokio::sync::{Mutex, Semaphore};
43use tonic::Streaming;
44
45use crate::MetaResult;
46use crate::hummock::CompactorManagerRef;
47use crate::hummock::compaction::CompactStatus;
48use crate::hummock::error::Result;
49use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
50use crate::hummock::manager::context::ContextInfo;
51use crate::hummock::manager::gc::{FullGcState, GcManager};
52use crate::manager::{MetaSrvEnv, MetadataManager};
53use crate::model::{ClusterId, MetadataModelError};
54use crate::rpc::metrics::MetaMetrics;
55
56mod context;
57mod gc;
58mod tests;
59mod versioning;
60pub use context::HummockVersionSafePoint;
61use versioning::*;
62pub(crate) mod checkpoint;
63mod commit_epoch;
64mod compaction;
65pub mod sequence;
66pub mod table_write_throughput_statistic;
67pub mod time_travel;
68mod timer_task;
69mod transaction;
70mod utils;
71mod worker;
72
73pub use commit_epoch::{CommitEpochInfo, NewTableFragmentInfo};
74pub use compaction::compaction_event_loop::*;
75use compaction::*;
76pub use compaction::{GroupState, GroupStateValidator};
77pub(crate) use utils::*;
78
79struct TableCommittedEpochNotifiers {
80    txs: HashMap<TableId, Vec<UnboundedSender<u64>>>,
81}
82
83impl TableCommittedEpochNotifiers {
84    fn notify_deltas(&mut self, deltas: &[HummockVersionDelta]) {
85        self.txs.retain(|table_id, txs| {
86            let mut is_dropped = false;
87            let mut committed_epoch = None;
88            for delta in deltas {
89                if delta.removed_table_ids.contains(table_id) {
90                    is_dropped = true;
91                    break;
92                }
93                if let Some(info) = delta.state_table_info_delta.get(table_id) {
94                    committed_epoch = Some(info.committed_epoch);
95                }
96            }
97            if is_dropped {
98                false
99            } else if let Some(committed_epoch) = committed_epoch {
100                txs.retain(|tx| tx.send(committed_epoch).is_ok());
101                !txs.is_empty()
102            } else {
103                true
104            }
105        })
106    }
107}
108// Update to states are performed as follow:
109// - Initialize ValTransaction for the meta state to update
110// - Make changes on the ValTransaction.
111// - Call `commit_multi_var` to commit the changes via meta store transaction. If transaction
112//   succeeds, the in-mem state will be updated by the way.
113pub struct HummockManager {
114    pub env: MetaSrvEnv,
115
116    metadata_manager: MetadataManager,
117    /// Lock order: `compaction`, `versioning`, `compaction_group_manager`, `context_info`
118    /// - Lock `compaction` first, then `versioning`, then `compaction_group_manager` and finally `context_info`.
119    /// - This order should be strictly followed to prevent deadlock.
120    compaction: MonitoredRwLock<Compaction>,
121    versioning: MonitoredRwLock<Versioning>,
122    /// `CompactionGroupManager` manages compaction configs for compaction groups.
123    compaction_group_manager: MonitoredRwLock<CompactionGroupManager>,
124    context_info: MonitoredRwLock<ContextInfo>,
125
126    pub metrics: Arc<MetaMetrics>,
127
128    pub compactor_manager: CompactorManagerRef,
129    pub iceberg_compactor_manager: Arc<IcebergCompactorManager>,
130    event_sender: HummockManagerEventSender,
131    object_store: ObjectStoreRef,
132    version_checkpoint_path: String,
133    version_archive_dir: String,
134    pause_version_checkpoint: AtomicBool,
135    table_write_throughput_statistic_manager:
136        parking_lot::RwLock<TableWriteThroughputStatisticManager>,
137    table_committed_epoch_notifiers: parking_lot::Mutex<TableCommittedEpochNotifiers>,
138
139    // for compactor
140    // `compactor_streams_change_tx` is used to pass the mapping from `context_id` to event_stream
141    // and is maintained in memory. All event_streams are consumed through a separate event loop
142    compactor_streams_change_tx: UnboundedSender<(u32, Streaming<SubscribeCompactionEventRequest>)>,
143
144    // `compaction_state` will record the types of compact tasks that can be triggered in `hummock`
145    // and suggest types with a certain priority.
146    pub compaction_state: CompactionState,
147    full_gc_state: Arc<FullGcState>,
148    now: Mutex<u64>,
149    inflight_time_travel_query: Semaphore,
150    gc_manager: GcManager,
151
152    table_id_to_table_option: parking_lot::RwLock<HashMap<u32, TableOption>>,
153}
154
155pub type HummockManagerRef = Arc<HummockManager>;
156
157use risingwave_object_store::object::{ObjectError, ObjectStoreRef, build_remote_object_store};
158use risingwave_pb::catalog::Table;
159
160macro_rules! start_measure_real_process_timer {
161    ($hummock_mgr:expr, $func_name:literal) => {
162        $hummock_mgr
163            .metrics
164            .hummock_manager_real_process_time
165            .with_label_values(&[$func_name])
166            .start_timer()
167    };
168}
169pub(crate) use start_measure_real_process_timer;
170
171use super::IcebergCompactorManager;
172use crate::controller::SqlMetaStore;
173use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
174use crate::hummock::manager::worker::HummockManagerEventSender;
175
176impl HummockManager {
177    pub async fn new(
178        env: MetaSrvEnv,
179        metadata_manager: MetadataManager,
180        metrics: Arc<MetaMetrics>,
181        compactor_manager: CompactorManagerRef,
182        compactor_streams_change_tx: UnboundedSender<(
183            u32,
184            Streaming<SubscribeCompactionEventRequest>,
185        )>,
186    ) -> Result<HummockManagerRef> {
187        let compaction_group_manager = CompactionGroupManager::new(&env).await?;
188        Self::new_impl(
189            env,
190            metadata_manager,
191            metrics,
192            compactor_manager,
193            compaction_group_manager,
194            compactor_streams_change_tx,
195        )
196        .await
197    }
198
199    #[cfg(any(test, feature = "test"))]
200    pub(super) async fn with_config(
201        env: MetaSrvEnv,
202        cluster_controller: crate::controller::cluster::ClusterControllerRef,
203        catalog_controller: crate::controller::catalog::CatalogControllerRef,
204        metrics: Arc<MetaMetrics>,
205        compactor_manager: CompactorManagerRef,
206        config: risingwave_pb::hummock::CompactionConfig,
207        compactor_streams_change_tx: UnboundedSender<(
208            u32,
209            Streaming<SubscribeCompactionEventRequest>,
210        )>,
211    ) -> HummockManagerRef {
212        let compaction_group_manager = CompactionGroupManager::new_with_config(&env, config)
213            .await
214            .unwrap();
215        let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
216        Self::new_impl(
217            env,
218            metadata_manager,
219            metrics,
220            compactor_manager,
221            compaction_group_manager,
222            compactor_streams_change_tx,
223        )
224        .await
225        .unwrap()
226    }
227
228    async fn new_impl(
229        env: MetaSrvEnv,
230        metadata_manager: MetadataManager,
231        metrics: Arc<MetaMetrics>,
232        compactor_manager: CompactorManagerRef,
233        compaction_group_manager: CompactionGroupManager,
234        compactor_streams_change_tx: UnboundedSender<(
235            u32,
236            Streaming<SubscribeCompactionEventRequest>,
237        )>,
238    ) -> Result<HummockManagerRef> {
239        let sys_params = env.system_params_reader().await;
240        let state_store_url = sys_params.state_store();
241        let state_store_dir: &str = sys_params.data_directory();
242        let use_new_object_prefix_strategy: bool = sys_params.use_new_object_prefix_strategy();
243        let deterministic_mode = env.opts.compaction_deterministic_test;
244        let mut object_store_config = env.opts.object_store_config.clone();
245        // For fs and hdfs object store, operations are not always atomic.
246        // We should manually enable atomicity guarantee by setting the atomic_write_dir config when building services.
247        object_store_config.set_atomic_write_dir();
248        let object_store = Arc::new(
249            build_remote_object_store(
250                state_store_url.strip_prefix("hummock+").unwrap_or("memory"),
251                metrics.object_store_metric.clone(),
252                "Version Checkpoint",
253                Arc::new(object_store_config),
254            )
255            .await,
256        );
257        // Make sure data dir is not used by another cluster.
258        // Skip this check in e2e compaction test, which needs to start a secondary cluster with
259        // same bucket
260        if !deterministic_mode {
261            write_exclusive_cluster_id(
262                state_store_dir,
263                env.cluster_id().clone(),
264                object_store.clone(),
265            )
266            .await?;
267
268            // config bucket lifecycle for new cluster.
269            if let risingwave_object_store::object::ObjectStoreImpl::S3(s3) = object_store.as_ref()
270                && !env.opts.do_not_config_object_storage_lifecycle
271            {
272                let is_bucket_expiration_configured =
273                    s3.inner().configure_bucket_lifecycle(state_store_dir).await;
274                if is_bucket_expiration_configured {
275                    return Err(ObjectError::internal("Cluster cannot start with object expiration configured for bucket because RisingWave data will be lost when object expiration kicks in.
276                    Please disable object expiration and restart the cluster.")
277                    .into());
278                }
279            }
280        }
281        let version_checkpoint_path = version_checkpoint_path(state_store_dir);
282        let version_archive_dir = version_archive_dir(state_store_dir);
283        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
284        let inflight_time_travel_query = env.opts.max_inflight_time_travel_query;
285        let gc_manager = GcManager::new(
286            object_store.clone(),
287            state_store_dir,
288            use_new_object_prefix_strategy,
289        );
290
291        let max_table_statistic_expired_time = std::cmp::max(
292            env.opts.table_stat_throuput_window_seconds_for_split,
293            env.opts.table_stat_throuput_window_seconds_for_merge,
294        ) as i64;
295
296        let iceberg_compactor_manager = Arc::new(IcebergCompactorManager::new());
297
298        let instance = HummockManager {
299            env,
300            versioning: MonitoredRwLock::new(
301                metrics.hummock_manager_lock_time.clone(),
302                Default::default(),
303                "hummock_manager::versioning",
304            ),
305            compaction: MonitoredRwLock::new(
306                metrics.hummock_manager_lock_time.clone(),
307                Default::default(),
308                "hummock_manager::compaction",
309            ),
310            compaction_group_manager: MonitoredRwLock::new(
311                metrics.hummock_manager_lock_time.clone(),
312                compaction_group_manager,
313                "hummock_manager::compaction_group_manager",
314            ),
315            context_info: MonitoredRwLock::new(
316                metrics.hummock_manager_lock_time.clone(),
317                Default::default(),
318                "hummock_manager::context_info",
319            ),
320            metrics,
321            metadata_manager,
322            compactor_manager,
323            iceberg_compactor_manager,
324            event_sender: tx,
325            object_store,
326            version_checkpoint_path,
327            version_archive_dir,
328            pause_version_checkpoint: AtomicBool::new(false),
329            table_write_throughput_statistic_manager: parking_lot::RwLock::new(
330                TableWriteThroughputStatisticManager::new(max_table_statistic_expired_time),
331            ),
332            table_committed_epoch_notifiers: parking_lot::Mutex::new(
333                TableCommittedEpochNotifiers {
334                    txs: Default::default(),
335                },
336            ),
337            compactor_streams_change_tx,
338            compaction_state: CompactionState::new(),
339            full_gc_state: FullGcState::new().into(),
340            now: Mutex::new(0),
341            inflight_time_travel_query: Semaphore::new(inflight_time_travel_query as usize),
342            gc_manager,
343            table_id_to_table_option: RwLock::new(HashMap::new()),
344        };
345        let instance = Arc::new(instance);
346        instance.init_time_travel_state().await?;
347        instance.start_worker(rx).await;
348        instance.load_meta_store_state().await?;
349        instance.release_invalid_contexts().await?;
350        // Release snapshots pinned by meta on restarting.
351        instance.release_meta_context().await?;
352        Ok(instance)
353    }
354
355    fn meta_store_ref(&self) -> &SqlMetaStore {
356        self.env.meta_store_ref()
357    }
358
359    /// Load state from meta store.
360    async fn load_meta_store_state(&self) -> Result<()> {
361        let now = self.load_now().await?;
362        *self.now.lock().await = now.unwrap_or(0);
363
364        let mut compaction_guard = self.compaction.write().await;
365        let mut versioning_guard = self.versioning.write().await;
366        let mut context_info_guard = self.context_info.write().await;
367        self.load_meta_store_state_impl(
368            &mut compaction_guard,
369            &mut versioning_guard,
370            &mut context_info_guard,
371        )
372        .await
373    }
374
375    /// Load state from meta store.
376    async fn load_meta_store_state_impl(
377        &self,
378        compaction_guard: &mut Compaction,
379        versioning_guard: &mut Versioning,
380        context_info: &mut ContextInfo,
381    ) -> Result<()> {
382        use sea_orm::EntityTrait;
383        let meta_store = self.meta_store_ref();
384        let compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus> =
385            compaction_status::Entity::find()
386                .all(&meta_store.conn)
387                .await
388                .map_err(MetadataModelError::from)?
389                .into_iter()
390                .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
391                .collect();
392        if !compaction_statuses.is_empty() {
393            compaction_guard.compaction_statuses = compaction_statuses;
394        }
395
396        compaction_guard.compact_task_assignment = compaction_task::Entity::find()
397            .all(&meta_store.conn)
398            .await
399            .map_err(MetadataModelError::from)?
400            .into_iter()
401            .map(|m| {
402                (
403                    m.id as HummockCompactionTaskId,
404                    PbCompactTaskAssignment::from(m),
405                )
406            })
407            .collect();
408
409        let hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta> =
410            hummock_version_delta::Entity::find()
411                .all(&meta_store.conn)
412                .await
413                .map_err(MetadataModelError::from)?
414                .into_iter()
415                .map(|m| {
416                    (
417                        HummockVersionId::new(m.id as _),
418                        HummockVersionDelta::from_persisted_protobuf(&m.into()),
419                    )
420                })
421                .collect();
422
423        let checkpoint = self.try_read_checkpoint().await?;
424        let mut redo_state = if let Some(c) = checkpoint {
425            versioning_guard.checkpoint = c;
426            versioning_guard.checkpoint.version.clone()
427        } else {
428            let default_compaction_config = self
429                .compaction_group_manager
430                .read()
431                .await
432                .default_compaction_config();
433            let checkpoint_version = HummockVersion::create_init_version(default_compaction_config);
434            tracing::info!("init hummock version checkpoint");
435            versioning_guard.checkpoint = HummockVersionCheckpoint {
436                version: checkpoint_version.clone(),
437                stale_objects: Default::default(),
438            };
439            self.write_checkpoint(&versioning_guard.checkpoint).await?;
440            checkpoint_version
441        };
442        let mut applied_delta_count = 0;
443        let total_to_apply = hummock_version_deltas.range(redo_state.id + 1..).count();
444        tracing::info!(
445            total_delta = hummock_version_deltas.len(),
446            total_to_apply,
447            "Start redo Hummock version."
448        );
449        for version_delta in hummock_version_deltas
450            .range(redo_state.id + 1..)
451            .map(|(_, v)| v)
452        {
453            assert_eq!(
454                version_delta.prev_id, redo_state.id,
455                "delta prev_id {}, redo state id {}",
456                version_delta.prev_id, redo_state.id
457            );
458            redo_state.apply_version_delta(version_delta);
459            applied_delta_count += 1;
460            if applied_delta_count % 1000 == 0 {
461                tracing::info!("Redo progress {applied_delta_count}/{total_to_apply}.");
462            }
463        }
464        tracing::info!("Finish redo Hummock version.");
465        versioning_guard.version_stats = hummock_version_stats::Entity::find()
466            .one(&meta_store.conn)
467            .await
468            .map_err(MetadataModelError::from)?
469            .map(HummockVersionStats::from)
470            .unwrap_or_else(|| HummockVersionStats {
471                // version_stats.hummock_version_id is always 0 in meta store.
472                hummock_version_id: 0,
473                ..Default::default()
474            });
475
476        versioning_guard.current_version = redo_state;
477        versioning_guard.hummock_version_deltas = hummock_version_deltas;
478
479        context_info.pinned_versions = hummock_pinned_version::Entity::find()
480            .all(&meta_store.conn)
481            .await
482            .map_err(MetadataModelError::from)?
483            .into_iter()
484            .map(|m| (m.context_id as HummockContextId, m.into()))
485            .collect();
486
487        self.initial_compaction_group_config_after_load(
488            versioning_guard,
489            self.compaction_group_manager.write().await.deref_mut(),
490        )
491        .await?;
492
493        Ok(())
494    }
495
496    pub fn init_metadata_for_version_replay(
497        &self,
498        _table_catalogs: Vec<Table>,
499        _compaction_groups: Vec<PbCompactionGroupInfo>,
500    ) -> Result<()> {
501        unimplemented!("kv meta store is deprecated");
502    }
503
504    /// Replay a version delta to current hummock version.
505    /// Returns the `version_id`, `max_committed_epoch` of the new version and the modified
506    /// compaction groups
507    pub async fn replay_version_delta(
508        &self,
509        mut version_delta: HummockVersionDelta,
510    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
511        let mut versioning_guard = self.versioning.write().await;
512        // ensure the version id is ascending after replay
513        version_delta.id = versioning_guard.current_version.next_version_id();
514        version_delta.prev_id = versioning_guard.current_version.id;
515        versioning_guard
516            .current_version
517            .apply_version_delta(&version_delta);
518
519        let version_new = versioning_guard.current_version.clone();
520        let compaction_group_ids = version_delta.group_deltas.keys().cloned().collect_vec();
521        Ok((version_new, compaction_group_ids))
522    }
523
524    pub async fn disable_commit_epoch(&self) -> HummockVersion {
525        let mut versioning_guard = self.versioning.write().await;
526        versioning_guard.disable_commit_epochs = true;
527        versioning_guard.current_version.clone()
528    }
529
530    pub fn metadata_manager(&self) -> &MetadataManager {
531        &self.metadata_manager
532    }
533
534    pub fn object_store_media_type(&self) -> &'static str {
535        self.object_store.media_type()
536    }
537
538    pub fn update_table_id_to_table_option(
539        &self,
540        new_table_id_to_table_option: HashMap<u32, TableOption>,
541    ) {
542        *self.table_id_to_table_option.write() = new_table_id_to_table_option;
543    }
544
545    pub fn metadata_manager_ref(&self) -> &MetadataManager {
546        &self.metadata_manager
547    }
548
549    pub async fn subscribe_table_committed_epoch(
550        &self,
551        table_id: TableId,
552    ) -> MetaResult<(u64, UnboundedReceiver<u64>)> {
553        let version = self.versioning.read().await;
554        if let Some(epoch) = version.current_version.table_committed_epoch(table_id) {
555            let (tx, rx) = unbounded_channel();
556            self.table_committed_epoch_notifiers
557                .lock()
558                .txs
559                .entry(table_id)
560                .or_default()
561                .push(tx);
562            Ok((epoch, rx))
563        } else {
564            Err(anyhow!("table {} not exist", table_id).into())
565        }
566    }
567}
568
569async fn write_exclusive_cluster_id(
570    state_store_dir: &str,
571    cluster_id: ClusterId,
572    object_store: ObjectStoreRef,
573) -> Result<()> {
574    const CLUSTER_ID_DIR: &str = "cluster_id";
575    const CLUSTER_ID_NAME: &str = "0";
576    let cluster_id_dir = format!("{}/{}/", state_store_dir, CLUSTER_ID_DIR);
577    let cluster_id_full_path = format!("{}{}", cluster_id_dir, CLUSTER_ID_NAME);
578    tracing::info!("try reading cluster_id");
579    match object_store.read(&cluster_id_full_path, ..).await {
580        Ok(stored_cluster_id) => {
581            let stored_cluster_id = String::from_utf8(stored_cluster_id.to_vec()).unwrap();
582            if cluster_id.deref() == stored_cluster_id {
583                return Ok(());
584            }
585
586            Err(ObjectError::internal(format!(
587                "Data directory is already used by another cluster with id {:?}, path {}.",
588                stored_cluster_id, cluster_id_full_path,
589            ))
590            .into())
591        }
592        Err(e) => {
593            if e.is_object_not_found_error() {
594                tracing::info!("cluster_id not found, writing cluster_id");
595                object_store
596                    .upload(&cluster_id_full_path, Bytes::from(String::from(cluster_id)))
597                    .await?;
598                return Ok(());
599            }
600            Err(e.into())
601        }
602    }
603}