risingwave_meta/hummock/manager/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18use std::sync::atomic::AtomicBool;
19
20use anyhow::anyhow;
21use bytes::Bytes;
22use itertools::Itertools;
23use parking_lot::lock_api::RwLock;
24use risingwave_common::catalog::{TableId, TableOption};
25use risingwave_common::monitor::MonitoredRwLock;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_hummock_sdk::change_log::TableChangeLog;
28use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
29use risingwave_hummock_sdk::{
30    CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockVersionId,
31    version_archive_dir, version_checkpoint_path,
32};
33use risingwave_meta_model::{
34    compaction_status, compaction_task, hummock_pinned_version, hummock_version_delta,
35    hummock_version_stats,
36};
37use risingwave_pb::hummock::compact_task::TaskStatus;
38use risingwave_pb::hummock::{
39    HummockVersionStats, PbCompactTaskAssignment, PbCompactionGroupInfo,
40    SubscribeCompactionEventRequest,
41};
42use table_write_throughput_statistic::TableWriteThroughputStatisticManager;
43use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
44use tokio::sync::{Mutex, Semaphore, oneshot};
45use tonic::Streaming;
46
47use crate::MetaResult;
48use crate::hummock::CompactorManagerRef;
49use crate::hummock::compaction::CompactStatus;
50use crate::hummock::error::Result;
51use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
52use crate::hummock::manager::context::ContextInfo;
53use crate::hummock::manager::gc::{FullGcState, GcManager};
54use crate::hummock::manager::sequence::PrefetchedSequence;
55use crate::hummock::model::ext::to_table_change_log;
56use crate::manager::{MetaSrvEnv, MetadataManager};
57use crate::model::{ClusterId, MetadataModelError};
58use crate::rpc::metrics::MetaMetrics;
59
60mod context;
61mod gc;
62mod tests;
63mod versioning;
64pub use context::HummockVersionSafePoint;
65use versioning::*;
66pub(crate) mod checkpoint;
67mod commit_epoch;
68mod compaction;
69pub mod sequence;
70pub mod table_write_throughput_statistic;
71pub mod time_travel;
72mod timer_task;
73mod transaction;
74mod utils;
75mod worker;
76
77pub use commit_epoch::{CommitEpochInfo, NewTableFragmentInfo};
78pub use compaction::compaction_event_loop::*;
79use compaction::*;
80pub use compaction::{GroupState, GroupStateValidator, ManualCompactionTriggerResult};
81pub(crate) use utils::*;
82
83struct TableCommittedEpochNotifiers {
84    txs: HashMap<TableId, Vec<UnboundedSender<u64>>>,
85}
86
87impl TableCommittedEpochNotifiers {
88    fn notify_deltas(&mut self, deltas: &[HummockVersionDelta]) {
89        self.txs.retain(|table_id, txs| {
90            let mut is_dropped = false;
91            let mut committed_epoch = None;
92            for delta in deltas {
93                if delta.removed_table_ids.contains(table_id) {
94                    is_dropped = true;
95                    break;
96                }
97                if let Some(info) = delta.state_table_info_delta.get(table_id) {
98                    committed_epoch = Some(info.committed_epoch);
99                }
100            }
101            if is_dropped {
102                false
103            } else if let Some(committed_epoch) = committed_epoch {
104                txs.retain(|tx| tx.send(committed_epoch).is_ok());
105                !txs.is_empty()
106            } else {
107                true
108            }
109        })
110    }
111}
112
113#[derive(Clone, Debug)]
114struct CompactionTaskReportResult {
115    task_id: HummockCompactionTaskId,
116    #[allow(dead_code)]
117    task_status: TaskStatus,
118    reported: bool,
119}
120
121struct CompactionTaskReportNotifiers {
122    txs: HashMap<HummockCompactionTaskId, Vec<oneshot::Sender<CompactionTaskReportResult>>>,
123}
124
125impl CompactionTaskReportNotifiers {
126    fn register(
127        &mut self,
128        task_id: HummockCompactionTaskId,
129        tx: oneshot::Sender<CompactionTaskReportResult>,
130    ) {
131        self.txs.entry(task_id).or_default().push(tx);
132    }
133
134    fn remove(&mut self, task_id: HummockCompactionTaskId) {
135        self.txs.remove(&task_id);
136    }
137
138    fn notify(&mut self, result: CompactionTaskReportResult) {
139        if let Some(txs) = self.txs.remove(&result.task_id) {
140            for tx in txs {
141                let _ = tx.send(result.clone());
142            }
143        }
144    }
145}
146// Update to states are performed as follow:
147// - Initialize ValTransaction for the meta state to update
148// - Make changes on the ValTransaction.
149// - Call `commit_multi_var` to commit the changes via meta store transaction. If transaction
150//   succeeds, the in-mem state will be updated by the way.
151pub struct HummockManager {
152    pub env: MetaSrvEnv,
153
154    metadata_manager: MetadataManager,
155    /// Lock order: `compaction`, `versioning`, `compaction_group_manager`, `context_info`
156    /// - Lock `compaction` first, then `versioning`, then `compaction_group_manager` and finally `context_info`.
157    /// - This order should be strictly followed to prevent deadlock.
158    compaction: MonitoredRwLock<Compaction>,
159    versioning: MonitoredRwLock<Versioning>,
160    /// `CompactionGroupManager` manages compaction configs for compaction groups.
161    compaction_group_manager: MonitoredRwLock<CompactionGroupManager>,
162    context_info: MonitoredRwLock<ContextInfo>,
163
164    pub metrics: Arc<MetaMetrics>,
165
166    pub compactor_manager: CompactorManagerRef,
167    pub iceberg_compactor_manager: Arc<IcebergCompactorManager>,
168    event_sender: HummockManagerEventSender,
169    object_store: ObjectStoreRef,
170    version_checkpoint_path: String,
171    version_archive_dir: String,
172    pause_version_checkpoint: AtomicBool,
173    table_write_throughput_statistic_manager:
174        parking_lot::RwLock<TableWriteThroughputStatisticManager>,
175    table_committed_epoch_notifiers: parking_lot::Mutex<TableCommittedEpochNotifiers>,
176    compaction_task_report_notifiers: parking_lot::Mutex<CompactionTaskReportNotifiers>,
177
178    // for compactor
179    // `compactor_streams_change_tx` is used to pass the mapping from `context_id` to event_stream
180    // and is maintained in memory. All event_streams are consumed through a separate event loop
181    compactor_streams_change_tx:
182        UnboundedSender<(HummockContextId, Streaming<SubscribeCompactionEventRequest>)>,
183
184    // `compaction_state` will record the types of compact tasks that can be triggered in `hummock`
185    // and suggest types with a certain priority.
186    pub compaction_state: CompactionState,
187    full_gc_state: Arc<FullGcState>,
188    now: Mutex<u64>,
189    inflight_time_travel_query: Semaphore,
190    gc_manager: GcManager,
191
192    /// In-memory cache of prefetched compaction task ids to reduce per-task DB round-trips.
193    prefetched_compaction_task_ids: PrefetchedSequence,
194    table_id_to_table_option: parking_lot::RwLock<HashMap<TableId, TableOption>>,
195}
196
197pub type HummockManagerRef = Arc<HummockManager>;
198
199use risingwave_object_store::object::{ObjectError, ObjectStoreRef, build_remote_object_store};
200use risingwave_pb::catalog::Table;
201
202macro_rules! start_measure_real_process_timer {
203    ($hummock_mgr:expr, $func_name:literal) => {
204        $hummock_mgr
205            .metrics
206            .hummock_manager_real_process_time
207            .with_label_values(&[$func_name])
208            .start_timer()
209    };
210}
211pub(crate) use start_measure_real_process_timer;
212
213use super::IcebergCompactorManager;
214use crate::controller::SqlMetaStore;
215use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
216use crate::hummock::manager::worker::HummockManagerEventSender;
217
218impl HummockManager {
219    pub async fn new(
220        env: MetaSrvEnv,
221        metadata_manager: MetadataManager,
222        metrics: Arc<MetaMetrics>,
223        compactor_manager: CompactorManagerRef,
224        compactor_streams_change_tx: UnboundedSender<(
225            HummockContextId,
226            Streaming<SubscribeCompactionEventRequest>,
227        )>,
228    ) -> Result<HummockManagerRef> {
229        let compaction_group_manager = CompactionGroupManager::new(&env).await?;
230        Self::new_impl(
231            env,
232            metadata_manager,
233            metrics,
234            compactor_manager,
235            compaction_group_manager,
236            compactor_streams_change_tx,
237        )
238        .await
239    }
240
241    #[cfg(any(test, feature = "test"))]
242    pub(super) async fn with_config(
243        env: MetaSrvEnv,
244        cluster_controller: crate::controller::cluster::ClusterControllerRef,
245        catalog_controller: crate::controller::catalog::CatalogControllerRef,
246        metrics: Arc<MetaMetrics>,
247        compactor_manager: CompactorManagerRef,
248        config: risingwave_pb::hummock::CompactionConfig,
249        compactor_streams_change_tx: UnboundedSender<(
250            HummockContextId,
251            Streaming<SubscribeCompactionEventRequest>,
252        )>,
253    ) -> HummockManagerRef {
254        let compaction_group_manager = CompactionGroupManager::new_with_config(&env, config)
255            .await
256            .unwrap();
257        let metadata_manager = MetadataManager::new(cluster_controller, catalog_controller);
258        Self::new_impl(
259            env,
260            metadata_manager,
261            metrics,
262            compactor_manager,
263            compaction_group_manager,
264            compactor_streams_change_tx,
265        )
266        .await
267        .unwrap()
268    }
269
270    async fn new_impl(
271        env: MetaSrvEnv,
272        metadata_manager: MetadataManager,
273        metrics: Arc<MetaMetrics>,
274        compactor_manager: CompactorManagerRef,
275        compaction_group_manager: CompactionGroupManager,
276        compactor_streams_change_tx: UnboundedSender<(
277            HummockContextId,
278            Streaming<SubscribeCompactionEventRequest>,
279        )>,
280    ) -> Result<HummockManagerRef> {
281        let sys_params = env.system_params_reader().await;
282        let state_store_url = sys_params.state_store();
283
284        let state_store_dir: &str = sys_params.data_directory();
285        let use_new_object_prefix_strategy: bool = sys_params.use_new_object_prefix_strategy();
286        let deterministic_mode = env.opts.compaction_deterministic_test;
287        let mut object_store_config = env.opts.object_store_config.clone();
288        // For fs and hdfs object store, operations are not always atomic.
289        // We should manually enable atomicity guarantee by setting the atomic_write_dir config when building services.
290        object_store_config.set_atomic_write_dir();
291        let object_store = Arc::new(
292            build_remote_object_store(
293                state_store_url.strip_prefix("hummock+").unwrap_or("memory"),
294                metrics.object_store_metric.clone(),
295                "Version Checkpoint",
296                Arc::new(object_store_config),
297            )
298            .await,
299        );
300        // Make sure data dir is not used by another cluster.
301        // Skip this check in e2e compaction test, which needs to start a secondary cluster with
302        // same bucket
303        if !deterministic_mode {
304            write_exclusive_cluster_id(
305                state_store_dir,
306                env.cluster_id().clone(),
307                object_store.clone(),
308            )
309            .await?;
310
311            // config bucket lifecycle for new cluster.
312            if let risingwave_object_store::object::ObjectStoreImpl::S3(s3) = object_store.as_ref()
313                && !env.opts.do_not_config_object_storage_lifecycle
314            {
315                let is_bucket_expiration_configured =
316                    s3.inner().configure_bucket_lifecycle(state_store_dir).await;
317                if is_bucket_expiration_configured {
318                    return Err(ObjectError::internal("Cluster cannot start with object expiration configured for bucket because RisingWave data will be lost when object expiration kicks in.
319                    Please disable object expiration and restart the cluster.")
320                    .into());
321                }
322            }
323        }
324        let version_checkpoint_path = version_checkpoint_path(state_store_dir);
325        let version_archive_dir = version_archive_dir(state_store_dir);
326        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
327        let inflight_time_travel_query = env.opts.max_inflight_time_travel_query;
328        let gc_manager = GcManager::new(
329            object_store.clone(),
330            state_store_dir,
331            use_new_object_prefix_strategy,
332        );
333
334        let max_table_statistic_expired_time = std::cmp::max(
335            env.opts.table_stat_throuput_window_seconds_for_split,
336            env.opts.table_stat_throuput_window_seconds_for_merge,
337        ) as i64;
338
339        let iceberg_compactor_manager = Arc::new(IcebergCompactorManager::new());
340
341        let instance = HummockManager {
342            env,
343            versioning: MonitoredRwLock::new(
344                metrics.hummock_manager_lock_time.clone(),
345                Default::default(),
346                "hummock_manager::versioning",
347            ),
348            compaction: MonitoredRwLock::new(
349                metrics.hummock_manager_lock_time.clone(),
350                Default::default(),
351                "hummock_manager::compaction",
352            ),
353            compaction_group_manager: MonitoredRwLock::new(
354                metrics.hummock_manager_lock_time.clone(),
355                compaction_group_manager,
356                "hummock_manager::compaction_group_manager",
357            ),
358            context_info: MonitoredRwLock::new(
359                metrics.hummock_manager_lock_time.clone(),
360                Default::default(),
361                "hummock_manager::context_info",
362            ),
363            metrics,
364            metadata_manager,
365            compactor_manager,
366            iceberg_compactor_manager,
367            event_sender: tx,
368            object_store,
369            version_checkpoint_path,
370            version_archive_dir,
371            pause_version_checkpoint: AtomicBool::new(false),
372            table_write_throughput_statistic_manager: parking_lot::RwLock::new(
373                TableWriteThroughputStatisticManager::new(max_table_statistic_expired_time),
374            ),
375            table_committed_epoch_notifiers: parking_lot::Mutex::new(
376                TableCommittedEpochNotifiers {
377                    txs: Default::default(),
378                },
379            ),
380            compaction_task_report_notifiers: parking_lot::Mutex::new(
381                CompactionTaskReportNotifiers {
382                    txs: Default::default(),
383                },
384            ),
385            compactor_streams_change_tx,
386            compaction_state: CompactionState::new(),
387            full_gc_state: FullGcState::new().into(),
388            now: Mutex::new(0),
389            inflight_time_travel_query: Semaphore::new(inflight_time_travel_query as usize),
390            gc_manager,
391            prefetched_compaction_task_ids: PrefetchedSequence::new(),
392            table_id_to_table_option: RwLock::new(HashMap::new()),
393        };
394        let instance = Arc::new(instance);
395        instance.init_time_travel_state().await?;
396        instance.may_fill_backward_table_change_logs().await?;
397
398        instance.start_worker(rx);
399        instance.load_meta_store_state().await?;
400        instance.release_invalid_contexts().await?;
401        // Release snapshots pinned by meta on restarting.
402        instance.release_meta_context().await?;
403        Ok(instance)
404    }
405
406    fn meta_store_ref(&self) -> &SqlMetaStore {
407        self.env.meta_store_ref()
408    }
409
410    /// Load state from meta store.
411    async fn load_meta_store_state(&self) -> Result<()> {
412        let now = self.load_now().await?;
413        *self.now.lock().await = now.unwrap_or(0);
414
415        let mut compaction_guard = self.compaction.write().await;
416        let mut versioning_guard = self.versioning.write().await;
417        let mut context_info_guard = self.context_info.write().await;
418        self.load_meta_store_state_impl(
419            &mut compaction_guard,
420            &mut versioning_guard,
421            &mut context_info_guard,
422        )
423        .await
424    }
425
426    /// Load state from meta store.
427    async fn load_meta_store_state_impl(
428        &self,
429        compaction_guard: &mut Compaction,
430        versioning_guard: &mut Versioning,
431        context_info: &mut ContextInfo,
432    ) -> Result<()> {
433        use sea_orm::EntityTrait;
434        let meta_store = self.meta_store_ref();
435        let compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus> =
436            compaction_status::Entity::find()
437                .all(&meta_store.conn)
438                .await
439                .map_err(MetadataModelError::from)?
440                .into_iter()
441                .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
442                .collect();
443        if !compaction_statuses.is_empty() {
444            compaction_guard.compaction_statuses = compaction_statuses;
445        }
446
447        compaction_guard.compact_task_assignment = compaction_task::Entity::find()
448            .all(&meta_store.conn)
449            .await
450            .map_err(MetadataModelError::from)?
451            .into_iter()
452            .map(|m| {
453                (
454                    m.id as HummockCompactionTaskId,
455                    PbCompactTaskAssignment::from(m),
456                )
457            })
458            .collect();
459
460        let hummock_version_deltas: BTreeMap<HummockVersionId, HummockVersionDelta> =
461            hummock_version_delta::Entity::find()
462                .all(&meta_store.conn)
463                .await
464                .map_err(MetadataModelError::from)?
465                .into_iter()
466                .map(|m| {
467                    (
468                        m.id,
469                        HummockVersionDelta::from_persisted_protobuf_owned(m.into()),
470                    )
471                })
472                .collect();
473
474        let checkpoint = self.try_read_checkpoint().await?;
475        let mut redo_state = if let Some(c) = checkpoint {
476            versioning_guard.checkpoint = c;
477            versioning_guard.checkpoint.version.clone()
478        } else {
479            let default_compaction_config = self
480                .compaction_group_manager
481                .read()
482                .await
483                .default_compaction_config();
484            let checkpoint_version = HummockVersion::create_init_version(default_compaction_config);
485            tracing::info!("init hummock version checkpoint");
486            versioning_guard.checkpoint = HummockVersionCheckpoint {
487                version: checkpoint_version.clone(),
488                stale_objects: Default::default(),
489            };
490            self.write_checkpoint(&versioning_guard.checkpoint).await?;
491            checkpoint_version
492        };
493        let mut applied_delta_count = 0;
494        let total_to_apply = hummock_version_deltas.range(redo_state.id + 1..).count();
495        tracing::info!(
496            total_delta = hummock_version_deltas.len(),
497            total_to_apply,
498            "Start redo Hummock version."
499        );
500        for version_delta in hummock_version_deltas
501            .range(redo_state.id + 1..)
502            .map(|(_, v)| v)
503        {
504            assert_eq!(
505                version_delta.prev_id, redo_state.id,
506                "delta prev_id {}, redo state id {}",
507                version_delta.prev_id, redo_state.id
508            );
509            redo_state.apply_version_delta(version_delta);
510            applied_delta_count += 1;
511            if applied_delta_count % 1000 == 0 {
512                tracing::info!("Redo progress {applied_delta_count}/{total_to_apply}.");
513            }
514        }
515        tracing::info!("Finish redo Hummock version.");
516        versioning_guard.version_stats = hummock_version_stats::Entity::find()
517            .one(&meta_store.conn)
518            .await
519            .map_err(MetadataModelError::from)?
520            .map(HummockVersionStats::from)
521            .unwrap_or_else(|| HummockVersionStats {
522                // version_stats.hummock_version_id is always 0 in meta store.
523                hummock_version_id: 0.into(),
524                ..Default::default()
525            });
526
527        versioning_guard.current_version = redo_state;
528        versioning_guard.hummock_version_deltas = hummock_version_deltas;
529        versioning_guard.table_change_log =
530            risingwave_meta_model::hummock_table_change_log::Entity::find()
531                .all(&self.env.meta_store_ref().conn)
532                .await
533                .map_err(MetadataModelError::from)?
534                .into_iter()
535                .map(|m| (m.table_id, to_table_change_log(m)))
536                .into_group_map()
537                .into_iter()
538                .map(|(table_id, unordered_change_logs)| {
539                    (
540                        table_id,
541                        TableChangeLog::new(
542                            unordered_change_logs
543                                .into_iter()
544                                .sorted_by_key(|l| l.checkpoint_epoch),
545                        ),
546                    )
547                })
548                .collect();
549
550        context_info.pinned_versions = hummock_pinned_version::Entity::find()
551            .all(&meta_store.conn)
552            .await
553            .map_err(MetadataModelError::from)?
554            .into_iter()
555            .map(|m| (m.context_id as HummockContextId, m.into()))
556            .collect();
557
558        self.initial_compaction_group_config_after_load(
559            versioning_guard,
560            self.compaction_group_manager.write().await.deref_mut(),
561        )
562        .await?;
563
564        Ok(())
565    }
566
567    pub fn init_metadata_for_version_replay(
568        &self,
569        _table_catalogs: Vec<Table>,
570        _compaction_groups: Vec<PbCompactionGroupInfo>,
571    ) -> Result<()> {
572        unimplemented!("kv meta store is deprecated");
573    }
574
575    /// Replay a version delta to current hummock version.
576    /// Returns the `version_id`, `max_committed_epoch` of the new version and the modified
577    /// compaction groups
578    pub async fn replay_version_delta(
579        &self,
580        mut version_delta: HummockVersionDelta,
581    ) -> Result<(HummockVersion, Vec<CompactionGroupId>)> {
582        let mut versioning_guard = self.versioning.write().await;
583        // ensure the version id is ascending after replay
584        version_delta.id = versioning_guard.current_version.next_version_id();
585        version_delta.prev_id = versioning_guard.current_version.id;
586        versioning_guard
587            .current_version
588            .apply_version_delta(&version_delta);
589
590        let version_new = versioning_guard.current_version.clone();
591        let compaction_group_ids = version_delta.group_deltas.keys().cloned().collect_vec();
592        Ok((version_new, compaction_group_ids))
593    }
594
595    pub async fn disable_commit_epoch(&self) -> HummockVersion {
596        let mut versioning_guard = self.versioning.write().await;
597        versioning_guard.disable_commit_epochs = true;
598        versioning_guard.current_version.clone()
599    }
600
601    pub fn metadata_manager(&self) -> &MetadataManager {
602        &self.metadata_manager
603    }
604
605    pub fn object_store_media_type(&self) -> &'static str {
606        self.object_store.media_type()
607    }
608
609    pub fn update_table_id_to_table_option(
610        &self,
611        new_table_id_to_table_option: HashMap<TableId, TableOption>,
612    ) {
613        *self.table_id_to_table_option.write() = new_table_id_to_table_option;
614    }
615
616    pub fn metadata_manager_ref(&self) -> &MetadataManager {
617        &self.metadata_manager
618    }
619
620    pub async fn subscribe_table_committed_epoch(
621        &self,
622        table_id: TableId,
623    ) -> MetaResult<(u64, UnboundedReceiver<u64>)> {
624        let version = self.versioning.read().await;
625        if let Some(epoch) = version.current_version.table_committed_epoch(table_id) {
626            let (tx, rx) = unbounded_channel();
627            self.table_committed_epoch_notifiers
628                .lock()
629                .txs
630                .entry(table_id)
631                .or_default()
632                .push(tx);
633            Ok((epoch, rx))
634        } else {
635            Err(anyhow!("table {} not exist", table_id).into())
636        }
637    }
638}
639
640async fn write_exclusive_cluster_id(
641    state_store_dir: &str,
642    cluster_id: ClusterId,
643    object_store: ObjectStoreRef,
644) -> Result<()> {
645    const CLUSTER_ID_DIR: &str = "cluster_id";
646    const CLUSTER_ID_NAME: &str = "0";
647    let cluster_id_dir = format!("{}/{}/", state_store_dir, CLUSTER_ID_DIR);
648    let cluster_id_full_path = format!("{}{}", cluster_id_dir, CLUSTER_ID_NAME);
649    tracing::info!("try reading cluster_id");
650    match object_store.read(&cluster_id_full_path, ..).await {
651        Ok(stored_cluster_id) => {
652            let stored_cluster_id = String::from_utf8(stored_cluster_id.to_vec()).unwrap();
653            if cluster_id.deref() == stored_cluster_id {
654                return Ok(());
655            }
656
657            Err(ObjectError::internal(format!(
658                "Data directory is already used by another cluster with id {:?}, path {}.",
659                stored_cluster_id, cluster_id_full_path,
660            ))
661            .into())
662        }
663        Err(e) => {
664            if e.is_object_not_found_error() {
665                tracing::info!("cluster_id not found, writing cluster_id");
666                object_store
667                    .upload(&cluster_id_full_path, Bytes::from(String::from(cluster_id)))
668                    .await?;
669                return Ok(());
670            }
671            Err(e.into())
672        }
673    }
674}