risingwave_meta/hummock/manager/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19
20use anyhow::anyhow;
21use bytes::Bytes;
22use itertools::Itertools;
23use parking_lot::lock_api::RwLock;
24use risingwave_common::catalog::{TableId, TableOption};
25use risingwave_common::monitor::MonitoredRwLock;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_hummock_sdk::change_log::TableChangeLog;
28use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
29use risingwave_hummock_sdk::{
30    CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockVersionId,
31    version_archive_dir, version_checkpoint_path,
32};
33use risingwave_meta_model::{
34    compaction_status, compaction_task, hummock_pinned_version, hummock_version_delta,
35    hummock_version_stats,
36};
37use risingwave_pb::hummock::compact_task::TaskStatus;
38use risingwave_pb::hummock::{
39    HummockVersionStats, PbCompactionGroupInfo, SubscribeCompactionEventRequest,
40};
41use table_write_throughput_statistic::TableWriteThroughputStatisticManager;
42use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
43use tokio::sync::{Mutex, Semaphore, oneshot};
44use tonic::Streaming;
45
46use crate::MetaResult;
47use crate::hummock::CompactorManagerRef;
48use crate::hummock::compaction::CompactStatus;
49use crate::hummock::error::Result;
50use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
51use crate::hummock::manager::context::ContextInfo;
52use crate::hummock::manager::gc::{FullGcState, GcManager};
53use crate::hummock::manager::sequence::PrefetchedSequence;
54use crate::hummock::model::ext::{compaction_task_model_to_assignment, to_table_change_log};
55use crate::manager::{MetaSrvEnv, MetadataManager};
56use crate::model::{ClusterId, MetadataModelError};
57use crate::rpc::metrics::MetaMetrics;
58
59mod context;
60mod gc;
61mod tests;
62mod versioning;
63pub use context::HummockVersionSafePoint;
64use versioning::*;
65pub(crate) mod checkpoint;
66mod commit_epoch;
67mod compaction;
68pub mod sequence;
69pub mod table_write_throughput_statistic;
70pub mod time_travel;
71mod timer_task;
72mod transaction;
73mod utils;
74mod worker;
75
76pub use commit_epoch::{CommitEpochInfo, NewTableFragmentInfo};
77pub use compaction::compaction_event_loop::*;
78use compaction::*;
79pub use compaction::{GroupState, GroupStateValidator, ManualCompactionTriggerResult};
80pub(crate) use utils::*;
81
82struct TableCommittedEpochNotifiers {
83    txs: HashMap<TableId, Vec<UnboundedSender<u64>>>,
84}
85
86impl TableCommittedEpochNotifiers {
87    fn notify_deltas(&mut self, deltas: &[HummockVersionDelta]) {
88        self.txs.retain(|table_id, txs| {
89            let mut is_dropped = false;
90            let mut committed_epoch = None;
91            for delta in deltas {
92                if delta.removed_table_ids.contains(table_id) {
93                    is_dropped = true;
94                    break;
95                }
96                if let Some(info) = delta.state_table_info_delta.get(table_id) {
97                    committed_epoch = Some(info.committed_epoch);
98                }
99            }
100            if is_dropped {
101                false
102            } else if let Some(committed_epoch) = committed_epoch {
103                txs.retain(|tx| tx.send(committed_epoch).is_ok());
104                !txs.is_empty()
105            } else {
106                true
107            }
108        })
109    }
110}
111
112#[derive(Clone, Debug)]
113struct CompactionTaskReportResult {
114    task_id: HummockCompactionTaskId,
115
116    task_status: TaskStatus,
117    reported: bool,
118}
119
120struct CompactionTaskReportNotifiers {
121    txs: HashMap<HummockCompactionTaskId, Vec<oneshot::Sender<CompactionTaskReportResult>>>,
122}
123
124impl CompactionTaskReportNotifiers {
125    fn register(
126        &mut self,
127        task_id: HummockCompactionTaskId,
128        tx: oneshot::Sender<CompactionTaskReportResult>,
129    ) {
130        self.txs.entry(task_id).or_default().push(tx);
131    }
132
133    fn remove(&mut self, task_id: HummockCompactionTaskId) {
134        self.txs.remove(&task_id);
135    }
136
137    fn notify(&mut self, result: CompactionTaskReportResult) {
138        if let Some(txs) = self.txs.remove(&result.task_id) {
139            for tx in txs {
140                let _ = tx.send(result.clone());
141            }
142        }
143    }
144}
145// Update to states are performed as follow:
146// - Initialize ValTransaction for the meta state to update
147// - Make changes on the ValTransaction.
148// - Call `commit_multi_var` to commit the changes via meta store transaction. If transaction
149//   succeeds, the in-mem state will be updated by the way.
150pub struct HummockManager {
151    pub env: MetaSrvEnv,
152
153    metadata_manager: MetadataManager,
154    /// Lock order: `compaction`, `versioning`, `compaction_group_manager`, `context_info`
155    /// - Lock `compaction` first, then `versioning`, then `compaction_group_manager` and finally `context_info`.
156    /// - This order should be strictly followed to prevent deadlock.
157    compaction: MonitoredRwLock<Compaction>,
158    versioning: MonitoredRwLock<Versioning>,
159    /// `CompactionGroupManager` manages compaction configs for compaction groups.
160    compaction_group_manager: MonitoredRwLock<CompactionGroupManager>,
161    context_info: MonitoredRwLock<ContextInfo>,
162
163    pub metrics: Arc<MetaMetrics>,
164
165    pub compactor_manager: CompactorManagerRef,
166    pub iceberg_compactor_manager: Arc<IcebergCompactorManager>,
167    event_sender: HummockManagerEventSender,
168    object_store: ObjectStoreRef,
169    version_checkpoint_path: String,
170    version_archive_dir: String,
171    pause_version_checkpoint: AtomicBool,
172    table_write_throughput_statistic_manager:
173        parking_lot::RwLock<TableWriteThroughputStatisticManager>,
174    table_committed_epoch_notifiers: parking_lot::Mutex<TableCommittedEpochNotifiers>,
175    compaction_task_report_notifiers: parking_lot::Mutex<CompactionTaskReportNotifiers>,
176
177    // for compactor
178    // `compactor_streams_change_tx` is used to pass the mapping from `context_id` to event_stream
179    // and is maintained in memory. All event_streams are consumed through a separate event loop
180    compactor_streams_change_tx:
181        UnboundedSender<(HummockContextId, Streaming<SubscribeCompactionEventRequest>)>,
182
183    // `compaction_state` will record the types of compact tasks that can be triggered in `hummock`
184    // and suggest types with a certain priority.
185    pub compaction_state: CompactionState,
186    full_gc_state: Arc<FullGcState>,
187    now: Mutex<u64>,
188    inflight_time_travel_query: Semaphore,
189    gc_manager: GcManager,
190
191    /// In-memory cache of prefetched compaction task ids to reduce per-task DB round-trips.
192    prefetched_compaction_task_ids: PrefetchedSequence,
193    table_id_to_table_option: parking_lot::RwLock<HashMap<TableId, TableOption>>,
194}
195
196pub type HummockManagerRef = Arc<HummockManager>;
197
198use risingwave_object_store::object::{ObjectError, ObjectStoreRef, build_remote_object_store};
199use risingwave_pb::catalog::Table;
200
201macro_rules! start_measure_real_process_timer {
202    ($hummock_mgr:expr, $func_name:literal) => {
203        $hummock_mgr
204            .metrics
205            .hummock_manager_real_process_time
206            .with_label_values(&[$func_name])
207            .start_timer()
208    };
209}
210pub(crate) use start_measure_real_process_timer;
211
212use super::IcebergCompactorManager;
213use crate::controller::SqlMetaStore;
214use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
215use crate::hummock::manager::worker::HummockManagerEventSender;
216
217impl HummockManager {
218    pub async fn new(
219        env: MetaSrvEnv,
220        metadata_manager: MetadataManager,
221        metrics: Arc<MetaMetrics>,
222        compactor_manager: CompactorManagerRef,
223        compactor_streams_change_tx: UnboundedSender<(
224            HummockContextId,
225            Streaming<SubscribeCompactionEventRequest>,
226        )>,
227    ) -> Result<HummockManagerRef> {
228        let compaction_group_manager = CompactionGroupManager::new(&env).await?;
229        Self::new_impl(
230            env,
231            metadata_manager,
232            metrics,
233            compactor_manager,
234            compaction_group_manager,
235            compactor_streams_change_tx,
236        )
237        .await
238    }
239
240    #[cfg(any(test, feature = "test"))]
241    pub(super) async fn with_config(
242        env: MetaSrvEnv,
243        cluster_controller: crate::controller::cluster::ClusterControllerRef,
244        catalog_controller: crate::controller::catalog::CatalogControllerRef,
245        metrics: Arc<MetaMetrics>,
246        compactor_manager: CompactorManagerRef,
247        config: risingwave_pb::hummock::CompactionConfig,
248        compactor_streams_change_tx: UnboundedSender<(
249            HummockContextId,
250            Streaming<SubscribeCompactionEventRequest>,
251        )>,
252    ) -> HummockManagerRef {
253        let compaction_group_manager = CompactionGroupManager::new_with_config(&env, config)
254            .await
255            .unwrap();
256        let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
257        Self::new_impl(
258            env,
259            metadata_manager,
260            metrics,
261            compactor_manager,
262            compaction_group_manager,
263            compactor_streams_change_tx,
264        )
265        .await
266        .unwrap()
267    }
268
269    async fn new_impl(
270        env: MetaSrvEnv,
271        metadata_manager: MetadataManager,
272        metrics: Arc<MetaMetrics>,
273        compactor_manager: CompactorManagerRef,
274        compaction_group_manager: CompactionGroupManager,
275        compactor_streams_change_tx: UnboundedSender<(
276            HummockContextId,
277            Streaming<SubscribeCompactionEventRequest>,
278        )>,
279    ) -> Result<HummockManagerRef> {
280        let sys_params = env.system_params_reader().await;
281        let state_store_url = sys_params.state_store();
282
283        let state_store_dir: &str = sys_params.data_directory();
284        let use_new_object_prefix_strategy: bool = sys_params.use_new_object_prefix_strategy();
285        let deterministic_mode = env.opts.compaction_deterministic_test;
286        let mut object_store_config = env.opts.object_store_config.clone();
287        // For fs and hdfs object store, operations are not always atomic.
288        // We should manually enable atomicity guarantee by setting the atomic_write_dir config when building services.
289        object_store_config.set_atomic_write_dir();
290        let object_store = Arc::new(
291            build_remote_object_store(
292                state_store_url.strip_prefix("hummock+").unwrap_or("memory"),
293                metrics.object_store_metric.clone(),
294                "Version Checkpoint",
295                Arc::new(object_store_config),
296            )
297            .await,
298        );
299        // Make sure data dir is not used by another cluster.
300        // Skip this check in e2e compaction test, which needs to start a secondary cluster with
301        // same bucket
302        if !deterministic_mode {
303            write_exclusive_cluster_id(
304                state_store_dir,
305                env.cluster_id().clone(),
306                object_store.clone(),
307            )
308            .await?;
309
310            // config bucket lifecycle for new cluster.
311            if let risingwave_object_store::object::ObjectStoreImpl::S3(s3) = object_store.as_ref()
312                && !env.opts.do_not_config_object_storage_lifecycle
313            {
314                let is_bucket_expiration_configured =
315                    s3.inner().configure_bucket_lifecycle(state_store_dir).await;
316                if is_bucket_expiration_configured {
317                    return Err(ObjectError::internal("Cluster cannot start with object expiration configured for bucket because RisingWave data will be lost when object expiration kicks in.
318                    Please disable object expiration and restart the cluster.")
319                    .into());
320                }
321            }
322        }
323        let version_checkpoint_path = version_checkpoint_path(state_store_dir);
324        let version_archive_dir = version_archive_dir(state_store_dir);
325        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
326        let inflight_time_travel_query = env.opts.max_inflight_time_travel_query;
327        let gc_manager = GcManager::new(
328            object_store.clone(),
329            state_store_dir,
330            use_new_object_prefix_strategy,
331        );
332
333        let max_table_statistic_expired_time = std::cmp::max(
334            env.opts.table_stat_throuput_window_seconds_for_split,
335            env.opts.table_stat_throuput_window_seconds_for_merge,
336        ) as i64;
337
338        let iceberg_compactor_manager = Arc::new(IcebergCompactorManager::new());
339
340        let instance = HummockManager {
341            env,
342            versioning: MonitoredRwLock::new(
343                metrics.hummock_manager_lock_time.clone(),
344                Default::default(),
345                "hummock_manager::versioning",
346            ),
347            compaction: MonitoredRwLock::new(
348                metrics.hummock_manager_lock_time.clone(),
349                Default::default(),
350                "hummock_manager::compaction",
351            ),
352            compaction_group_manager: MonitoredRwLock::new(
353                metrics.hummock_manager_lock_time.clone(),
354                compaction_group_manager,
355                "hummock_manager::compaction_group_manager",
356            ),
357            context_info: MonitoredRwLock::new(
358                metrics.hummock_manager_lock_time.clone(),
359                Default::default(),
360                "hummock_manager::context_info",
361            ),
362            metrics,
363            metadata_manager,
364            compactor_manager,
365            iceberg_compactor_manager,
366            event_sender: tx,
367            object_store,
368            version_checkpoint_path,
369            version_archive_dir,
370            pause_version_checkpoint: AtomicBool::new(false),
371            table_write_throughput_statistic_manager: parking_lot::RwLock::new(
372                TableWriteThroughputStatisticManager::new(max_table_statistic_expired_time),
373            ),
374            table_committed_epoch_notifiers: parking_lot::Mutex::new(
375                TableCommittedEpochNotifiers {
376                    txs: Default::default(),
377                },
378            ),
379            compaction_task_report_notifiers: parking_lot::Mutex::new(
380                CompactionTaskReportNotifiers {
381                    txs: Default::default(),
382                },
383            ),
384            compactor_streams_change_tx,
385            compaction_state: CompactionState::new(),
386            full_gc_state: FullGcState::new().into(),
387            now: Mutex::new(0),
388            inflight_time_travel_query: Semaphore::new(inflight_time_travel_query as usize),
389            gc_manager,
390            prefetched_compaction_task_ids: PrefetchedSequence::new(),
391            table_id_to_table_option: RwLock::new(HashMap::new()),
392        };
393        let instance = Arc::new(instance);
394        instance.init_time_travel_state().await?;
395        instance.may_fill_backward_table_change_logs().await?;
396
397        instance.start_worker(rx);
398        instance.load_meta_store_state().await?;
399        instance.release_invalid_contexts().await?;
400        // Release snapshots pinned by meta on restarting.
401        instance.release_meta_context().await?;
402        Ok(instance)
403    }
404
405    fn meta_store_ref(&self) -> &SqlMetaStore {
406        self.env.meta_store_ref()
407    }
408
409    /// Load state from meta store.
410    async fn load_meta_store_state(&self) -> Result<()> {
411        let now = self.load_now().await?;
412        *self.now.lock().await = now.unwrap_or(0);
413
414        let mut compaction_guard = self.compaction.write().await;
415        let mut versioning_guard = self.versioning.write().await;
416        let mut context_info_guard = self.context_info.write().await;
417        self.load_meta_store_state_impl(
418            &mut compaction_guard,
419            &mut versioning_guard,
420            &mut context_info_guard,
421        )
422        .await
423    }
424
425    /// Load state from meta store.
426    async fn load_meta_store_state_impl(
427        &self,
428        compaction_guard: &mut Compaction,
429        versioning_guard: &mut Versioning,
430        context_info: &mut ContextInfo,
431    ) -> Result<()> {
432        use sea_orm::EntityTrait;
433        let meta_store = self.meta_store_ref();
434        let compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus> =
435            compaction_status::Entity::find()
436                .all(&meta_store.conn)
437                .await
438                .map_err(MetadataModelError::from)?
439                .into_iter()
440                .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
441                .collect();
442        if !compaction_statuses.is_empty() {
443            compaction_guard.compaction_statuses = compaction_statuses;
444        }
445
446        compaction_guard.compact_task_assignment = compaction_task::Entity::find()
447            .all(&meta_store.conn)
448            .await
449            .map_err(MetadataModelError::from)?
450            .into_iter()
451            .map(|m| {
452                (
453                    m.id as HummockCompactionTaskId,
454                    compaction_task_model_to_assignment(m),
455                )
456            })
457            .collect();
458
459        let hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta> =
460            hummock_version_delta::Entity::find()
461                .all(&meta_store.conn)
462                .await
463                .map_err(MetadataModelError::from)?
464                .into_iter()
465                .map(|m| {
466                    (
467                        m.id,
468                        HummockVersionDelta::from_persisted_protobuf_owned(m.into()),
469                    )
470                })
471                .collect();
472
473        let checkpoint = self.try_read_checkpoint().await?;
474        let mut redo_state = if let Some(c) = checkpoint {
475            versioning_guard.checkpoint = c;
476            versioning_guard.checkpoint.version.clone()
477        } else {
478            let default_compaction_config = self
479                .compaction_group_manager
480                .read()
481                .await
482                .default_compaction_config();
483            let checkpoint_version = HummockVersion::create_init_version(default_compaction_config);
484            tracing::info!("init hummock version checkpoint");
485            versioning_guard.checkpoint = HummockVersionCheckpoint {
486                version: checkpoint_version.clone(),
487                stale_objects: Default::default(),
488            };
489            self.write_checkpoint(&versioning_guard.checkpoint).await?;
490            checkpoint_version
491        };
492        let mut applied_delta_count = 0;
493        let total_to_apply = hummock_version_deltas.range(redo_state.id + 1..).count();
494        tracing::info!(
495            total_delta = hummock_version_deltas.len(),
496            total_to_apply,
497            "Start redo Hummock version."
498        );
499        for version_delta in hummock_version_deltas
500            .range(redo_state.id + 1..)
501            .map(|(_, v)| v)
502        {
503            assert_eq!(
504                version_delta.prev_id, redo_state.id,
505                "delta prev_id {}, redo state id {}",
506                version_delta.prev_id, redo_state.id
507            );
508            redo_state.apply_version_delta(version_delta);
509            applied_delta_count += 1;
510            if applied_delta_count % 1000 == 0 {
511                tracing::info!("Redo progress {applied_delta_count}/{total_to_apply}.");
512            }
513        }
514        tracing::info!("Finish redo Hummock version.");
515        let pruned_stale_table_id_count = redo_state.prune_stale_table_ids_from_ssts();
516        if pruned_stale_table_id_count > 0 {
517            tracing::warn!(
518                pruned_stale_table_id_count,
519                version_id = ?redo_state.id,
520                "Pruned stale table ids from recovered Hummock SST metadata."
521            );
522        }
523        versioning_guard.version_stats = hummock_version_stats::Entity::find()
524            .one(&meta_store.conn)
525            .await
526            .map_err(MetadataModelError::from)?
527            .map(HummockVersionStats::from)
528            .unwrap_or_else(|| HummockVersionStats {
529                // version_stats.hummock_version_id is always 0 in meta store.
530                hummock_version_id: 0.into(),
531                ..Default::default()
532            });
533
534        versioning_guard.current_version = redo_state;
535        versioning_guard.hummock_version_deltas = hummock_version_deltas;
536        versioning_guard.table_change_log =
537            risingwave_meta_model::hummock_table_change_log::Entity::find()
538                .all(&self.env.meta_store_ref().conn)
539                .await
540                .map_err(MetadataModelError::from)?
541                .into_iter()
542                .map(|m| (m.table_id, to_table_change_log(m)))
543                .into_group_map()
544                .into_iter()
545                .map(|(table_id, unordered_change_logs)| {
546                    (
547                        table_id,
548                        TableChangeLog::new(
549                            unordered_change_logs
550                                .into_iter()
551                                .sorted_by_key(|l| l.checkpoint_epoch),
552                        ),
553                    )
554                })
555                .collect();
556
557        context_info.pinned_versions = hummock_pinned_version::Entity::find()
558            .all(&meta_store.conn)
559            .await
560            .map_err(MetadataModelError::from)?
561            .into_iter()
562            .map(|m| (m.context_id as HummockContextId, m.into()))
563            .collect();
564
565        self.initial_compaction_group_config_after_load(
566            versioning_guard,
567            self.compaction_group_manager.write().await.deref_mut(),
568        )
569        .await?;
570
571        Ok(())
572    }
573
574    pub fn init_metadata_for_version_replay(
575        &self,
576        _table_catalogs: Vec<Table>,
577        _compaction_groups: Vec<PbCompactionGroupInfo>,
578    ) -> Result<()> {
579        unimplemented!("kv meta store is deprecated");
580    }
581
582    /// Replay a version delta to current hummock version.
583    /// Returns the `version_id`, `max_committed_epoch` of the new version and the modified
584    /// compaction groups
585    pub async fn replay_version_delta(
586        &self,
587        mut version_delta: HummockVersionDelta,
588    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
589        let mut versioning_guard = self.versioning.write().await;
590        // ensure the version id is ascending after replay
591        version_delta.id = versioning_guard.current_version.next_version_id();
592        version_delta.prev_id = versioning_guard.current_version.id;
593        versioning_guard
594            .current_version
595            .apply_version_delta(&version_delta);
596
597        let version_new = versioning_guard.current_version.clone();
598        let compaction_group_ids = version_delta.group_deltas.keys().cloned().collect_vec();
599        Ok((version_new, compaction_group_ids))
600    }
601
602    pub async fn disable_commit_epoch(&self) -> HummockVersion {
603        let mut versioning_guard = self.versioning.write().await;
604        versioning_guard.disable_commit_epochs = true;
605        versioning_guard.current_version.clone()
606    }
607
608    pub fn metadata_manager(&self) -> &MetadataManager {
609        &self.metadata_manager
610    }
611
612    pub fn object_store_media_type(&self) -> &'static str {
613        self.object_store.media_type()
614    }
615
616    pub fn update_table_id_to_table_option(
617        &self,
618        new_table_id_to_table_option: HashMap<TableId, TableOption>,
619    ) {
620        *self.table_id_to_table_option.write() = new_table_id_to_table_option;
621    }
622
623    pub fn metadata_manager_ref(&self) -> &MetadataManager {
624        &self.metadata_manager
625    }
626
627    pub async fn subscribe_table_committed_epoch(
628        &self,
629        table_id: TableId,
630    ) -> MetaResult<(u64, UnboundedReceiver<u64>)> {
631        let version = self.versioning.read().await;
632        if let Some(epoch) = version.current_version.table_committed_epoch(table_id) {
633            let (tx, rx) = unbounded_channel();
634            self.table_committed_epoch_notifiers
635                .lock()
636                .txs
637                .entry(table_id)
638                .or_default()
639                .push(tx);
640            Ok((epoch, rx))
641        } else {
642            Err(anyhow!("table {} not exist", table_id).into())
643        }
644    }
645}
646
647async fn write_exclusive_cluster_id(
648    state_store_dir: &str,
649    cluster_id: ClusterId,
650    object_store: ObjectStoreRef,
651) -> Result<()> {
652    const CLUSTER_ID_DIR: &str = "cluster_id";
653    const CLUSTER_ID_NAME: &str = "0";
654    let cluster_id_dir = format!("{}/{}/", state_store_dir, CLUSTER_ID_DIR);
655    let cluster_id_full_path = format!("{}{}", cluster_id_dir, CLUSTER_ID_NAME);
656    tracing::info!("try reading cluster_id");
657    match object_store.read(&cluster_id_full_path, ..).await {
658        Ok(stored_cluster_id) => {
659            let stored_cluster_id = String::from_utf8(stored_cluster_id.to_vec()).unwrap();
660            if cluster_id.deref() == stored_cluster_id {
661                return Ok(());
662            }
663
664            Err(ObjectError::internal(format!(
665                "Data directory is already used by another cluster with id {:?}, path {}.",
666                stored_cluster_id, cluster_id_full_path,
667            ))
668            .into())
669        }
670        Err(e) => {
671            if e.is_object_not_found_error() {
672                tracing::info!("cluster_id not found, writing cluster_id");
673                object_store
674                    .upload(&cluster_id_full_path, Bytes::from(String::from(cluster_id)))
675                    .await?;
676                return Ok(());
677            }
678            Err(e.into())
679        }
680    }
681}