risingwave_meta/hummock/manager/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19
20use anyhow::anyhow;
21use bytes::Bytes;
22use itertools::Itertools;
23use parking_lot::lock_api::RwLock;
24use risingwave_common::catalog::{TableId, TableOption};
25use risingwave_common::monitor::MonitoredRwLock;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
28use risingwave_hummock_sdk::{
29    CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockVersionId,
30    version_archive_dir, version_checkpoint_path,
31};
32use risingwave_meta_model::{
33    compaction_status, compaction_task, hummock_pinned_version, hummock_version_delta,
34    hummock_version_stats,
35};
36use risingwave_pb::hummock::compact_task::TaskStatus;
37use risingwave_pb::hummock::{
38    HummockVersionStats, PbCompactTaskAssignment, PbCompactionGroupInfo,
39    SubscribeCompactionEventRequest,
40};
41use table_write_throughput_statistic::TableWriteThroughputStatisticManager;
42use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
43use tokio::sync::{Mutex, Semaphore, oneshot};
44use tonic::Streaming;
45
46use crate::MetaResult;
47use crate::hummock::CompactorManagerRef;
48use crate::hummock::compaction::CompactStatus;
49use crate::hummock::error::Result;
50use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
51use crate::hummock::manager::context::ContextInfo;
52use crate::hummock::manager::gc::{FullGcState, GcManager};
53use crate::manager::{MetaSrvEnv, MetadataManager};
54use crate::model::{ClusterId, MetadataModelError};
55use crate::rpc::metrics::MetaMetrics;
56
57mod context;
58mod gc;
59mod tests;
60mod versioning;
61pub use context::HummockVersionSafePoint;
62use versioning::*;
63pub(crate) mod checkpoint;
64mod commit_epoch;
65mod compaction;
66pub mod sequence;
67pub mod table_write_throughput_statistic;
68pub mod time_travel;
69mod timer_task;
70mod transaction;
71mod utils;
72mod worker;
73
74pub use commit_epoch::{CommitEpochInfo, NewTableFragmentInfo};
75pub use compaction::compaction_event_loop::*;
76use compaction::*;
77pub use compaction::{GroupState, GroupStateValidator, ManualCompactionTriggerResult};
78pub(crate) use utils::*;
79
80struct TableCommittedEpochNotifiers {
81    txs: HashMap<TableId, Vec<UnboundedSender<u64>>>,
82}
83
84impl TableCommittedEpochNotifiers {
85    fn notify_deltas(&mut self, deltas: &[HummockVersionDelta]) {
86        self.txs.retain(|table_id, txs| {
87            let mut is_dropped = false;
88            let mut committed_epoch = None;
89            for delta in deltas {
90                if delta.removed_table_ids.contains(table_id) {
91                    is_dropped = true;
92                    break;
93                }
94                if let Some(info) = delta.state_table_info_delta.get(table_id) {
95                    committed_epoch = Some(info.committed_epoch);
96                }
97            }
98            if is_dropped {
99                false
100            } else if let Some(committed_epoch) = committed_epoch {
101                txs.retain(|tx| tx.send(committed_epoch).is_ok());
102                !txs.is_empty()
103            } else {
104                true
105            }
106        })
107    }
108}
109
110#[derive(Clone, Debug)]
111struct CompactionTaskReportResult {
112    task_id: HummockCompactionTaskId,
113    #[allow(dead_code)]
114    task_status: TaskStatus,
115    reported: bool,
116}
117
118struct CompactionTaskReportNotifiers {
119    txs: HashMap<HummockCompactionTaskId, Vec<oneshot::Sender<CompactionTaskReportResult>>>,
120}
121
122impl CompactionTaskReportNotifiers {
123    fn register(
124        &mut self,
125        task_id: HummockCompactionTaskId,
126        tx: oneshot::Sender<CompactionTaskReportResult>,
127    ) {
128        self.txs.entry(task_id).or_default().push(tx);
129    }
130
131    fn remove(&mut self, task_id: HummockCompactionTaskId) {
132        self.txs.remove(&task_id);
133    }
134
135    fn notify(&mut self, result: CompactionTaskReportResult) {
136        if let Some(txs) = self.txs.remove(&result.task_id) {
137            for tx in txs {
138                let _ = tx.send(result.clone());
139            }
140        }
141    }
142}
143// Update to states are performed as follow:
144// - Initialize ValTransaction for the meta state to update
145// - Make changes on the ValTransaction.
146// - Call `commit_multi_var` to commit the changes via meta store transaction. If transaction
147//   succeeds, the in-mem state will be updated by the way.
148pub struct HummockManager {
149    pub env: MetaSrvEnv,
150
151    metadata_manager: MetadataManager,
152    /// Lock order: `compaction`, `versioning`, `compaction_group_manager`, `context_info`
153    /// - Lock `compaction` first, then `versioning`, then `compaction_group_manager` and finally `context_info`.
154    /// - This order should be strictly followed to prevent deadlock.
155    compaction: MonitoredRwLock<Compaction>,
156    versioning: MonitoredRwLock<Versioning>,
157    /// `CompactionGroupManager` manages compaction configs for compaction groups.
158    compaction_group_manager: MonitoredRwLock<CompactionGroupManager>,
159    context_info: MonitoredRwLock<ContextInfo>,
160
161    pub metrics: Arc<MetaMetrics>,
162
163    pub compactor_manager: CompactorManagerRef,
164    pub iceberg_compactor_manager: Arc<IcebergCompactorManager>,
165    event_sender: HummockManagerEventSender,
166    object_store: ObjectStoreRef,
167    version_checkpoint_path: String,
168    version_archive_dir: String,
169    pause_version_checkpoint: AtomicBool,
170    table_write_throughput_statistic_manager:
171        parking_lot::RwLock<TableWriteThroughputStatisticManager>,
172    table_committed_epoch_notifiers: parking_lot::Mutex<TableCommittedEpochNotifiers>,
173    compaction_task_report_notifiers: parking_lot::Mutex<CompactionTaskReportNotifiers>,
174
175    // for compactor
176    // `compactor_streams_change_tx` is used to pass the mapping from `context_id` to event_stream
177    // and is maintained in memory. All event_streams are consumed through a separate event loop
178    compactor_streams_change_tx:
179        UnboundedSender<(HummockContextId, Streaming<SubscribeCompactionEventRequest>)>,
180
181    // `compaction_state` will record the types of compact tasks that can be triggered in `hummock`
182    // and suggest types with a certain priority.
183    pub compaction_state: CompactionState,
184    full_gc_state: Arc<FullGcState>,
185    now: Mutex<u64>,
186    inflight_time_travel_query: Semaphore,
187    gc_manager: GcManager,
188
189    table_id_to_table_option: parking_lot::RwLock<HashMap<TableId, TableOption>>,
190}
191
192pub type HummockManagerRef = Arc<HummockManager>;
193
194use risingwave_object_store::object::{ObjectError, ObjectStoreRef, build_remote_object_store};
195use risingwave_pb::catalog::Table;
196
197macro_rules! start_measure_real_process_timer {
198    ($hummock_mgr:expr, $func_name:literal) => {
199        $hummock_mgr
200            .metrics
201            .hummock_manager_real_process_time
202            .with_label_values(&[$func_name])
203            .start_timer()
204    };
205}
206pub(crate) use start_measure_real_process_timer;
207
208use super::IcebergCompactorManager;
209use crate::controller::SqlMetaStore;
210use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
211use crate::hummock::manager::worker::HummockManagerEventSender;
212
213impl HummockManager {
214    pub async fn new(
215        env: MetaSrvEnv,
216        metadata_manager: MetadataManager,
217        metrics: Arc<MetaMetrics>,
218        compactor_manager: CompactorManagerRef,
219        compactor_streams_change_tx: UnboundedSender<(
220            HummockContextId,
221            Streaming<SubscribeCompactionEventRequest>,
222        )>,
223    ) -> Result<HummockManagerRef> {
224        let compaction_group_manager = CompactionGroupManager::new(&env).await?;
225        Self::new_impl(
226            env,
227            metadata_manager,
228            metrics,
229            compactor_manager,
230            compaction_group_manager,
231            compactor_streams_change_tx,
232        )
233        .await
234    }
235
236    #[cfg(any(test, feature = "test"))]
237    pub(super) async fn with_config(
238        env: MetaSrvEnv,
239        cluster_controller: crate::controller::cluster::ClusterControllerRef,
240        catalog_controller: crate::controller::catalog::CatalogControllerRef,
241        metrics: Arc<MetaMetrics>,
242        compactor_manager: CompactorManagerRef,
243        config: risingwave_pb::hummock::CompactionConfig,
244        compactor_streams_change_tx: UnboundedSender<(
245            HummockContextId,
246            Streaming<SubscribeCompactionEventRequest>,
247        )>,
248    ) -> HummockManagerRef {
249        let compaction_group_manager = CompactionGroupManager::new_with_config(&env, config)
250            .await
251            .unwrap();
252        let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
253        Self::new_impl(
254            env,
255            metadata_manager,
256            metrics,
257            compactor_manager,
258            compaction_group_manager,
259            compactor_streams_change_tx,
260        )
261        .await
262        .unwrap()
263    }
264
265    async fn new_impl(
266        env: MetaSrvEnv,
267        metadata_manager: MetadataManager,
268        metrics: Arc<MetaMetrics>,
269        compactor_manager: CompactorManagerRef,
270        compaction_group_manager: CompactionGroupManager,
271        compactor_streams_change_tx: UnboundedSender<(
272            HummockContextId,
273            Streaming<SubscribeCompactionEventRequest>,
274        )>,
275    ) -> Result<HummockManagerRef> {
276        let sys_params = env.system_params_reader().await;
277        let state_store_url = sys_params.state_store();
278
279        let state_store_dir: &str = sys_params.data_directory();
280        let use_new_object_prefix_strategy: bool = sys_params.use_new_object_prefix_strategy();
281        let deterministic_mode = env.opts.compaction_deterministic_test;
282        let mut object_store_config = env.opts.object_store_config.clone();
283        // For fs and hdfs object store, operations are not always atomic.
284        // We should manually enable atomicity guarantee by setting the atomic_write_dir config when building services.
285        object_store_config.set_atomic_write_dir();
286        let object_store = Arc::new(
287            build_remote_object_store(
288                state_store_url.strip_prefix("hummock+").unwrap_or("memory"),
289                metrics.object_store_metric.clone(),
290                "Version Checkpoint",
291                Arc::new(object_store_config),
292            )
293            .await,
294        );
295        // Make sure data dir is not used by another cluster.
296        // Skip this check in e2e compaction test, which needs to start a secondary cluster with
297        // same bucket
298        if !deterministic_mode {
299            write_exclusive_cluster_id(
300                state_store_dir,
301                env.cluster_id().clone(),
302                object_store.clone(),
303            )
304            .await?;
305
306            // config bucket lifecycle for new cluster.
307            if let risingwave_object_store::object::ObjectStoreImpl::S3(s3) = object_store.as_ref()
308                && !env.opts.do_not_config_object_storage_lifecycle
309            {
310                let is_bucket_expiration_configured =
311                    s3.inner().configure_bucket_lifecycle(state_store_dir).await;
312                if is_bucket_expiration_configured {
313                    return Err(ObjectError::internal("Cluster cannot start with object expiration configured for bucket because RisingWave data will be lost when object expiration kicks in.
314                    Please disable object expiration and restart the cluster.")
315                    .into());
316                }
317            }
318        }
319        let version_checkpoint_path = version_checkpoint_path(state_store_dir);
320        let version_archive_dir = version_archive_dir(state_store_dir);
321        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
322        let inflight_time_travel_query = env.opts.max_inflight_time_travel_query;
323        let gc_manager = GcManager::new(
324            object_store.clone(),
325            state_store_dir,
326            use_new_object_prefix_strategy,
327        );
328
329        let max_table_statistic_expired_time = std::cmp::max(
330            env.opts.table_stat_throuput_window_seconds_for_split,
331            env.opts.table_stat_throuput_window_seconds_for_merge,
332        ) as i64;
333
334        let iceberg_compactor_manager = Arc::new(IcebergCompactorManager::new());
335
336        let instance = HummockManager {
337            env,
338            versioning: MonitoredRwLock::new(
339                metrics.hummock_manager_lock_time.clone(),
340                Default::default(),
341                "hummock_manager::versioning",
342            ),
343            compaction: MonitoredRwLock::new(
344                metrics.hummock_manager_lock_time.clone(),
345                Default::default(),
346                "hummock_manager::compaction",
347            ),
348            compaction_group_manager: MonitoredRwLock::new(
349                metrics.hummock_manager_lock_time.clone(),
350                compaction_group_manager,
351                "hummock_manager::compaction_group_manager",
352            ),
353            context_info: MonitoredRwLock::new(
354                metrics.hummock_manager_lock_time.clone(),
355                Default::default(),
356                "hummock_manager::context_info",
357            ),
358            metrics,
359            metadata_manager,
360            compactor_manager,
361            iceberg_compactor_manager,
362            event_sender: tx,
363            object_store,
364            version_checkpoint_path,
365            version_archive_dir,
366            pause_version_checkpoint: AtomicBool::new(false),
367            table_write_throughput_statistic_manager: parking_lot::RwLock::new(
368                TableWriteThroughputStatisticManager::new(max_table_statistic_expired_time),
369            ),
370            table_committed_epoch_notifiers: parking_lot::Mutex::new(
371                TableCommittedEpochNotifiers {
372                    txs: Default::default(),
373                },
374            ),
375            compaction_task_report_notifiers: parking_lot::Mutex::new(
376                CompactionTaskReportNotifiers {
377                    txs: Default::default(),
378                },
379            ),
380            compactor_streams_change_tx,
381            compaction_state: CompactionState::new(),
382            full_gc_state: FullGcState::new().into(),
383            now: Mutex::new(0),
384            inflight_time_travel_query: Semaphore::new(inflight_time_travel_query as usize),
385            gc_manager,
386            table_id_to_table_option: RwLock::new(HashMap::new()),
387        };
388        let instance = Arc::new(instance);
389        instance.init_time_travel_state().await?;
390        instance.start_worker(rx);
391        instance.load_meta_store_state().await?;
392        instance.release_invalid_contexts().await?;
393        // Release snapshots pinned by meta on restarting.
394        instance.release_meta_context().await?;
395        Ok(instance)
396    }
397
398    fn meta_store_ref(&self) -> &SqlMetaStore {
399        self.env.meta_store_ref()
400    }
401
402    /// Load state from meta store.
403    async fn load_meta_store_state(&self) -> Result<()> {
404        let now = self.load_now().await?;
405        *self.now.lock().await = now.unwrap_or(0);
406
407        let mut compaction_guard = self.compaction.write().await;
408        let mut versioning_guard = self.versioning.write().await;
409        let mut context_info_guard = self.context_info.write().await;
410        self.load_meta_store_state_impl(
411            &mut compaction_guard,
412            &mut versioning_guard,
413            &mut context_info_guard,
414        )
415        .await
416    }
417
418    /// Load state from meta store.
419    async fn load_meta_store_state_impl(
420        &self,
421        compaction_guard: &mut Compaction,
422        versioning_guard: &mut Versioning,
423        context_info: &mut ContextInfo,
424    ) -> Result<()> {
425        use sea_orm::EntityTrait;
426        let meta_store = self.meta_store_ref();
427        let compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus> =
428            compaction_status::Entity::find()
429                .all(&meta_store.conn)
430                .await
431                .map_err(MetadataModelError::from)?
432                .into_iter()
433                .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
434                .collect();
435        if !compaction_statuses.is_empty() {
436            compaction_guard.compaction_statuses = compaction_statuses;
437        }
438
439        compaction_guard.compact_task_assignment = compaction_task::Entity::find()
440            .all(&meta_store.conn)
441            .await
442            .map_err(MetadataModelError::from)?
443            .into_iter()
444            .map(|m| {
445                (
446                    m.id as HummockCompactionTaskId,
447                    PbCompactTaskAssignment::from(m),
448                )
449            })
450            .collect();
451
452        let hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta> =
453            hummock_version_delta::Entity::find()
454                .all(&meta_store.conn)
455                .await
456                .map_err(MetadataModelError::from)?
457                .into_iter()
458                .map(|m| {
459                    (
460                        m.id,
461                        HummockVersionDelta::from_persisted_protobuf_owned(m.into()),
462                    )
463                })
464                .collect();
465
466        let checkpoint = self.try_read_checkpoint().await?;
467        let mut redo_state = if let Some(c) = checkpoint {
468            versioning_guard.checkpoint = c;
469            versioning_guard.checkpoint.version.clone()
470        } else {
471            let default_compaction_config = self
472                .compaction_group_manager
473                .read()
474                .await
475                .default_compaction_config();
476            let checkpoint_version = HummockVersion::create_init_version(default_compaction_config);
477            tracing::info!("init hummock version checkpoint");
478            versioning_guard.checkpoint = HummockVersionCheckpoint {
479                version: checkpoint_version.clone(),
480                stale_objects: Default::default(),
481            };
482            self.write_checkpoint(&versioning_guard.checkpoint).await?;
483            checkpoint_version
484        };
485        let mut applied_delta_count = 0;
486        let total_to_apply = hummock_version_deltas.range(redo_state.id + 1..).count();
487        tracing::info!(
488            total_delta = hummock_version_deltas.len(),
489            total_to_apply,
490            "Start redo Hummock version."
491        );
492        for version_delta in hummock_version_deltas
493            .range(redo_state.id + 1..)
494            .map(|(_, v)| v)
495        {
496            assert_eq!(
497                version_delta.prev_id, redo_state.id,
498                "delta prev_id {}, redo state id {}",
499                version_delta.prev_id, redo_state.id
500            );
501            redo_state.apply_version_delta(version_delta);
502            applied_delta_count += 1;
503            if applied_delta_count % 1000 == 0 {
504                tracing::info!("Redo progress {applied_delta_count}/{total_to_apply}.");
505            }
506        }
507        tracing::info!("Finish redo Hummock version.");
508        versioning_guard.version_stats = hummock_version_stats::Entity::find()
509            .one(&meta_store.conn)
510            .await
511            .map_err(MetadataModelError::from)?
512            .map(HummockVersionStats::from)
513            .unwrap_or_else(|| HummockVersionStats {
514                // version_stats.hummock_version_id is always 0 in meta store.
515                hummock_version_id: 0.into(),
516                ..Default::default()
517            });
518
519        versioning_guard.current_version = redo_state;
520        versioning_guard.hummock_version_deltas = hummock_version_deltas;
521
522        context_info.pinned_versions = hummock_pinned_version::Entity::find()
523            .all(&meta_store.conn)
524            .await
525            .map_err(MetadataModelError::from)?
526            .into_iter()
527            .map(|m| (m.context_id as HummockContextId, m.into()))
528            .collect();
529
530        self.initial_compaction_group_config_after_load(
531            versioning_guard,
532            self.compaction_group_manager.write().await.deref_mut(),
533        )
534        .await?;
535
536        Ok(())
537    }
538
539    pub fn init_metadata_for_version_replay(
540        &self,
541        _table_catalogs: Vec<Table>,
542        _compaction_groups: Vec<PbCompactionGroupInfo>,
543    ) -> Result<()> {
544        unimplemented!("kv meta store is deprecated");
545    }
546
547    /// Replay a version delta to current hummock version.
548    /// Returns the `version_id`, `max_committed_epoch` of the new version and the modified
549    /// compaction groups
550    pub async fn replay_version_delta(
551        &self,
552        mut version_delta: HummockVersionDelta,
553    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
554        let mut versioning_guard = self.versioning.write().await;
555        // ensure the version id is ascending after replay
556        version_delta.id = versioning_guard.current_version.next_version_id();
557        version_delta.prev_id = versioning_guard.current_version.id;
558        versioning_guard
559            .current_version
560            .apply_version_delta(&version_delta);
561
562        let version_new = versioning_guard.current_version.clone();
563        let compaction_group_ids = version_delta.group_deltas.keys().cloned().collect_vec();
564        Ok((version_new, compaction_group_ids))
565    }
566
567    pub async fn disable_commit_epoch(&self) -> HummockVersion {
568        let mut versioning_guard = self.versioning.write().await;
569        versioning_guard.disable_commit_epochs = true;
570        versioning_guard.current_version.clone()
571    }
572
573    pub fn metadata_manager(&self) -> &MetadataManager {
574        &self.metadata_manager
575    }
576
577    pub fn object_store_media_type(&self) -> &'static str {
578        self.object_store.media_type()
579    }
580
581    pub fn update_table_id_to_table_option(
582        &self,
583        new_table_id_to_table_option: HashMap<TableId, TableOption>,
584    ) {
585        *self.table_id_to_table_option.write() = new_table_id_to_table_option;
586    }
587
588    pub fn metadata_manager_ref(&self) -> &MetadataManager {
589        &self.metadata_manager
590    }
591
592    pub async fn subscribe_table_committed_epoch(
593        &self,
594        table_id: TableId,
595    ) -> MetaResult<(u64, UnboundedReceiver<u64>)> {
596        let version = self.versioning.read().await;
597        if let Some(epoch) = version.current_version.table_committed_epoch(table_id) {
598            let (tx, rx) = unbounded_channel();
599            self.table_committed_epoch_notifiers
600                .lock()
601                .txs
602                .entry(table_id)
603                .or_default()
604                .push(tx);
605            Ok((epoch, rx))
606        } else {
607            Err(anyhow!("table {} not exist", table_id).into())
608        }
609    }
610}
611
612async fn write_exclusive_cluster_id(
613    state_store_dir: &str,
614    cluster_id: ClusterId,
615    object_store: ObjectStoreRef,
616) -> Result<()> {
617    const CLUSTER_ID_DIR: &str = "cluster_id";
618    const CLUSTER_ID_NAME: &str = "0";
619    let cluster_id_dir = format!("{}/{}/", state_store_dir, CLUSTER_ID_DIR);
620    let cluster_id_full_path = format!("{}{}", cluster_id_dir, CLUSTER_ID_NAME);
621    tracing::info!("try reading cluster_id");
622    match object_store.read(&cluster_id_full_path, ..).await {
623        Ok(stored_cluster_id) => {
624            let stored_cluster_id = String::from_utf8(stored_cluster_id.to_vec()).unwrap();
625            if cluster_id.deref() == stored_cluster_id {
626                return Ok(());
627            }
628
629            Err(ObjectError::internal(format!(
630                "Data directory is already used by another cluster with id {:?}, path {}.",
631                stored_cluster_id, cluster_id_full_path,
632            ))
633            .into())
634        }
635        Err(e) => {
636            if e.is_object_not_found_error() {
637                tracing::info!("cluster_id not found, writing cluster_id");
638                object_store
639                    .upload(&cluster_id_full_path, Bytes::from(String::from(cluster_id)))
640                    .await?;
641                return Ok(());
642            }
643            Err(e.into())
644        }
645    }
646}