risingwave_compaction_test/
compaction_test_runner.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16use std::net::SocketAddr;
17use std::ops::Bound;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::thread::JoinHandle;
21use std::time::Duration;
22
23use anyhow::anyhow;
24use bytes::{BufMut, Bytes, BytesMut};
25use clap::Parser;
26use foyer::Hint;
27use risingwave_common::catalog::TableOption;
28use risingwave_common::config::{
29    MetaConfig, NoOverride, extract_storage_memory_config, load_config,
30};
31use risingwave_common::util::addr::HostAddr;
32use risingwave_common::util::iter_util::ZipEqFast;
33use risingwave_common::util::tokio_util::sync::CancellationToken;
34use risingwave_hummock_sdk::key::TableKey;
35use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
36use risingwave_hummock_sdk::{
37    CompactionGroupId, FIRST_VERSION_ID, HummockEpoch, HummockReadEpoch, HummockVersionId,
38};
39use risingwave_pb::common::WorkerType;
40use risingwave_pb::id::TableId;
41use risingwave_rpc_client::{HummockMetaClient, MetaClient};
42use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
43use risingwave_storage::hummock::{CachePolicy, HummockStorage};
44use risingwave_storage::monitor::{
45    CompactorMetrics, HummockMetrics, HummockStateStoreMetrics, MonitoredStateStore,
46    MonitoredStorageMetrics, ObjectStoreMetrics,
47};
48use risingwave_storage::opts::StorageOpts;
49use risingwave_storage::store::{NewReadSnapshotOptions, ReadOptions, StateStoreRead};
50use risingwave_storage::{StateStore, StateStoreImpl, StateStoreIter};
51
52const SST_ID_SHIFT_COUNT: u32 = 1000000;
53const CHECKPOINT_FREQ_FOR_REPLAY: u64 = 99999999;
54
55use crate::CompactionTestOpts;
56
57struct CompactionTestMetrics {
58    num_expect_check: u64,
59    num_uncheck: u64,
60}
61
62impl CompactionTestMetrics {
63    fn new() -> CompactionTestMetrics {
64        Self {
65            num_expect_check: 0,
66            num_uncheck: 0,
67        }
68    }
69}
70
71/// Steps to use the compaction test tool
72///
73/// 1. Start the cluster with ci-compaction-test config: `./risedev d ci-compaction-test`
74/// 2. Ingest enough L0 SSTs, for example we can use the tpch-bench tool
75/// 3. Disable hummock manager commit new epochs: `./risedev ctl hummock disable-commit-epoch`, and
76///    it will print the current max committed epoch in Meta.
77/// 4. Use the test tool to replay hummock version deltas and trigger compactions:
78///    `./risedev compaction-test --state-store hummock+s3://your-bucket -t <table_id>`
79pub async fn compaction_test_main(
80    _listen_addr: SocketAddr,
81    advertise_addr: HostAddr,
82    opts: CompactionTestOpts,
83) -> anyhow::Result<()> {
84    let meta_listen_addr = opts
85        .meta_address
86        .strip_prefix("http://")
87        .unwrap()
88        .to_owned();
89
90    let _meta_handle = tokio::spawn(start_meta_node(
91        meta_listen_addr.clone(),
92        opts.state_store.clone(),
93        opts.config_path_for_meta.clone(),
94    ));
95
96    // Wait for meta starts
97    tokio::time::sleep(Duration::from_secs(1)).await;
98    tracing::info!("Started embedded Meta");
99
100    let (compactor_thrd, compactor_shutdown_tx) = start_compactor_thread(
101        opts.meta_address.clone(),
102        advertise_addr.to_string(),
103        opts.config_path.clone(),
104    );
105
106    let original_meta_endpoint = "http://127.0.0.1:5690";
107    let mut table_id = opts.table_id.into();
108
109    init_metadata_for_replay(
110        original_meta_endpoint,
111        &opts.meta_address,
112        &advertise_addr,
113        opts.ci_mode,
114        &mut table_id,
115    )
116    .await?;
117
118    assert_ne!(table_id, 0, "Invalid table_id for correctness checking");
119
120    let version_deltas = pull_version_deltas(original_meta_endpoint, &advertise_addr).await?;
121
122    tracing::info!(
123        "Pulled delta logs from Meta: len(logs): {}",
124        version_deltas.len()
125    );
126
127    let replay_thrd = start_replay_thread(opts, table_id, version_deltas);
128    replay_thrd.join().unwrap();
129    compactor_shutdown_tx.send(()).unwrap();
130    compactor_thrd.join().unwrap();
131    Ok(())
132}
133
134pub async fn start_meta_node(listen_addr: String, state_store: String, config_path: String) {
135    let meta_opts = risingwave_meta_node::MetaNodeOpts::parse_from([
136        "meta-node",
137        "--listen-addr",
138        &listen_addr,
139        "--advertise-addr",
140        &listen_addr,
141        "--backend",
142        "mem",
143        "--state-store",
144        &state_store,
145        "--config-path",
146        &config_path,
147    ]);
148    let config = load_config(&meta_opts.config_path, &meta_opts);
149    // We set a large checkpoint frequency to prevent the embedded meta node
150    // to commit new epochs to avoid bumping the hummock version during version log replay.
151    assert_eq!(
152        CHECKPOINT_FREQ_FOR_REPLAY,
153        config.system.checkpoint_frequency.unwrap()
154    );
155    assert!(
156        config.meta.enable_compaction_deterministic,
157        "enable_compaction_deterministic should be set"
158    );
159
160    risingwave_meta_node::start(meta_opts, CancellationToken::new() /* dummy */).await
161}
162
163async fn start_compactor_node(
164    meta_rpc_endpoint: String,
165    advertise_addr: String,
166    config_path: String,
167) {
168    let opts = risingwave_compactor::CompactorOpts::parse_from([
169        "compactor-node",
170        "--listen-addr",
171        "127.0.0.1:5550",
172        "--advertise-addr",
173        &advertise_addr,
174        "--meta-address",
175        &meta_rpc_endpoint,
176        "--config-path",
177        &config_path,
178    ]);
179    risingwave_compactor::start(opts, CancellationToken::new() /* dummy */).await
180}
181
182pub fn start_compactor_thread(
183    meta_endpoint: String,
184    advertise_addr: String,
185    config_path: String,
186) -> (JoinHandle<()>, std::sync::mpsc::Sender<()>) {
187    let (tx, rx) = std::sync::mpsc::channel();
188    let compact_func = move || {
189        let runtime = tokio::runtime::Builder::new_multi_thread()
190            .enable_all()
191            .build()
192            .unwrap();
193        runtime.block_on(async {
194            tokio::spawn(async {
195                tracing::info!("Starting compactor node");
196                start_compactor_node(meta_endpoint, advertise_addr, config_path).await
197            });
198            rx.recv().unwrap();
199        });
200    };
201
202    (std::thread::spawn(compact_func), tx)
203}
204
205fn start_replay_thread(
206    opts: CompactionTestOpts,
207    table_id: TableId,
208    version_deltas: Vec<HummockVersionDelta>,
209) -> JoinHandle<()> {
210    let replay_func = move || {
211        let runtime = tokio::runtime::Builder::new_current_thread()
212            .enable_all()
213            .build()
214            .unwrap();
215        runtime
216            .block_on(start_replay(opts, table_id, version_deltas))
217            .expect("repaly error occurred");
218    };
219
220    std::thread::spawn(replay_func)
221}
222
223async fn init_metadata_for_replay(
224    cluster_meta_endpoint: &str,
225    new_meta_endpoint: &str,
226    advertise_addr: &HostAddr,
227    ci_mode: bool,
228    table_id: &mut TableId,
229) -> anyhow::Result<()> {
230    // The compactor needs to receive catalog notification from the new Meta node,
231    // and we should wait the compactor finishes setup the subscription channel
232    // before registering the table catalog to the new Meta node. Otherwise the
233    // filter key manager will fail to acquire a key extractor.
234    tokio::time::sleep(Duration::from_secs(2)).await;
235
236    let meta_config = MetaConfig::default();
237    let meta_client: MetaClient;
238    tokio::select! {
239        _ = tokio::signal::ctrl_c() => {
240            tracing::info!("Ctrl+C received, now exiting");
241            std::process::exit(0);
242        },
243        ret = MetaClient::register_new(cluster_meta_endpoint.parse()?, WorkerType::RiseCtl, advertise_addr, Default::default(), Arc::new(meta_config.clone())) => {
244            (meta_client, _) = ret;
245        },
246    }
247    let worker_id = meta_client.worker_id();
248    tracing::info!("Assigned init worker id {}", worker_id);
249    meta_client.activate(advertise_addr).await.unwrap();
250
251    let tables = meta_client.risectl_list_state_tables().await?;
252
253    let (new_meta_client, _) = MetaClient::register_new(
254        new_meta_endpoint.parse()?,
255        WorkerType::RiseCtl,
256        advertise_addr,
257        Default::default(),
258        Arc::new(meta_config.clone()),
259    )
260    .await;
261    new_meta_client.activate(advertise_addr).await.unwrap();
262    if ci_mode {
263        let table_to_check = tables.iter().find(|t| t.name == "nexmark_q7").unwrap();
264        *table_id = table_to_check.id;
265    }
266
267    // No need to init compaction_groups, because it will be done when replaying version delta.
268    new_meta_client
269        .init_metadata_for_replay(tables, vec![])
270        .await?;
271
272    // shift the sst id to avoid conflict with the original meta node
273    let _ = new_meta_client
274        .get_new_object_ids(SST_ID_SHIFT_COUNT)
275        .await?;
276
277    tracing::info!("Finished initializing the new Meta");
278    Ok(())
279}
280
281async fn pull_version_deltas(
282    cluster_meta_endpoint: &str,
283    advertise_addr: &HostAddr,
284) -> anyhow::Result<Vec<HummockVersionDelta>> {
285    // Register to the cluster.
286    // We reuse the RiseCtl worker type here
287    let (meta_client, _) = MetaClient::register_new(
288        cluster_meta_endpoint.parse()?,
289        WorkerType::RiseCtl,
290        advertise_addr,
291        Default::default(),
292        Arc::new(MetaConfig::default()),
293    )
294    .await;
295    let worker_id = meta_client.worker_id();
296    tracing::info!("Assigned pull worker id {}", worker_id);
297    meta_client.activate(advertise_addr).await.unwrap();
298
299    let (handle, shutdown_tx) =
300        MetaClient::start_heartbeat_loop(meta_client.clone(), Duration::from_millis(1000));
301    let res = meta_client
302        .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
303        .await
304        .unwrap();
305
306    if let Err(err) = shutdown_tx.send(()) {
307        tracing::warn!("Failed to send shutdown to heartbeat task: {:?}", err);
308    }
309    handle.await?;
310    tracing::info!("Shutdown the pull worker");
311    Ok(res)
312}
313
314async fn start_replay(
315    opts: CompactionTestOpts,
316    table_to_check: TableId,
317    version_delta_logs: Vec<HummockVersionDelta>,
318) -> anyhow::Result<()> {
319    let advertise_addr = "127.0.0.1:7770".parse().unwrap();
320    tracing::info!(
321        "Start to replay. Advertise address is {}, Table id {}",
322        advertise_addr,
323        table_to_check
324    );
325
326    let mut metric = CompactionTestMetrics::new();
327    let config = load_config(&opts.config_path_for_meta, NoOverride);
328    tracing::info!(
329        "Starting replay with config {:?} and opts {:?}",
330        config,
331        opts
332    );
333
334    // Register to the cluster.
335    // We reuse the RiseCtl worker type here
336    let (meta_client, system_params) = MetaClient::register_new(
337        opts.meta_address.parse()?,
338        WorkerType::RiseCtl,
339        &advertise_addr,
340        Default::default(),
341        Arc::new(config.meta.clone()),
342    )
343    .await;
344    let worker_id = meta_client.worker_id();
345    tracing::info!("Assigned replay worker id {}", worker_id);
346    meta_client.activate(&advertise_addr).await.unwrap();
347
348    let sub_tasks = vec![MetaClient::start_heartbeat_loop(
349        meta_client.clone(),
350        Duration::from_millis(1000),
351    )];
352
353    // Prevent the embedded meta to commit new epochs during version replay
354    let latest_version = meta_client.disable_commit_epoch().await?;
355    assert_eq!(FIRST_VERSION_ID, latest_version.id);
356    // The new meta should not have any data at this time
357    for level in latest_version.levels.values() {
358        level.levels.iter().for_each(|lvl| {
359            assert!(lvl.table_infos.is_empty());
360            assert_eq!(0, lvl.total_file_size);
361        });
362    }
363
364    // Creates a hummock state store *after* we reset the hummock version
365    let storage_memory_config = extract_storage_memory_config(&config);
366    let storage_opts = Arc::new(StorageOpts::from((
367        &config,
368        &system_params,
369        &storage_memory_config,
370    )));
371    let hummock = create_hummock_store_with_metrics(&meta_client, storage_opts, &opts).await?;
372
373    // Replay version deltas from FIRST_VERSION_ID to the version before reset
374    let mut modified_compaction_groups = HashSet::<CompactionGroupId>::new();
375    let mut replay_count: u64 = 0;
376    let mut replayed_epochs = vec![];
377    let mut check_result_task: Option<tokio::task::JoinHandle<_>> = None;
378
379    for delta in version_delta_logs {
380        let (current_version, compaction_groups) = meta_client.replay_version_delta(delta).await?;
381        let (version_id, committed_epoch) = (
382            current_version.id,
383            current_version
384                .table_committed_epoch(table_to_check)
385                .unwrap_or_default(),
386        );
387        tracing::info!(
388            "Replayed version delta version_id: {}, committed_epoch: {}, compaction_groups: {:?}",
389            version_id,
390            committed_epoch,
391            compaction_groups
392        );
393
394        hummock
395            .inner()
396            .update_version_and_wait(current_version.clone())
397            .await;
398
399        replay_count += 1;
400        replayed_epochs.push(committed_epoch);
401        compaction_groups
402            .into_iter()
403            .map(|c| modified_compaction_groups.insert(c))
404            .count();
405
406        // We can custom more conditions for compaction triggering
407        // For now I just use a static way here
408        if replay_count.is_multiple_of(opts.num_trigger_frequency)
409            && !modified_compaction_groups.is_empty()
410        {
411            // join previously spawned check result task
412            if let Some(handle) = check_result_task {
413                handle.await??;
414            }
415
416            metric.num_expect_check += 1;
417
418            // pop the latest epoch
419            replayed_epochs.pop();
420            let mut epochs = vec![committed_epoch];
421            epochs.extend(pin_old_snapshots(&meta_client, &replayed_epochs, 1).into_iter());
422            tracing::info!("===== Prepare to check snapshots: {:?}", epochs);
423
424            let old_version_iters = open_hummock_iters(&hummock, &epochs, table_to_check).await?;
425
426            tracing::info!(
427                "Trigger compaction for version {}, epoch {} compaction_groups: {:?}",
428                version_id,
429                committed_epoch,
430                modified_compaction_groups,
431            );
432            // Try trigger multiple rounds of compactions but doesn't wait for finish
433            let is_multi_round = opts.num_trigger_rounds > 1;
434            for _ in 0..opts.num_trigger_rounds {
435                meta_client
436                    .trigger_compaction_deterministic(
437                        version_id,
438                        Vec::from_iter(modified_compaction_groups.iter().copied()),
439                    )
440                    .await?;
441                if is_multi_round {
442                    tokio::time::sleep(Duration::from_millis(50)).await;
443                }
444            }
445
446            let old_task_num = meta_client.get_assigned_compact_task_num().await?;
447            // Poll for compaction task status
448            let (schedule_ok, version_diff) =
449                poll_compaction_schedule_status(&meta_client, old_task_num).await;
450
451            tracing::info!(
452                "Compaction schedule_ok {}, version_diff {}",
453                schedule_ok,
454                version_diff,
455            );
456            let (compaction_ok, new_version) = poll_compaction_tasks_status(
457                &meta_client,
458                schedule_ok,
459                version_diff as u32,
460                &current_version,
461            )
462            .await;
463
464            tracing::info!(
465                "Compaction schedule_ok {}, version_diff {} compaction_ok {}",
466                schedule_ok,
467                version_diff,
468                compaction_ok,
469            );
470
471            let new_version_id = new_version.id;
472            assert!(
473                new_version_id >= version_id,
474                "new_version_id: {}",
475                new_version_id,
476            );
477
478            if new_version_id != version_id {
479                hummock.inner().update_version_and_wait(new_version).await;
480
481                let new_version_iters =
482                    open_hummock_iters(&hummock, &epochs, table_to_check).await?;
483
484                // spawn a task to check the results
485                check_result_task = Some(tokio::spawn(check_compaction_results(
486                    new_version_id,
487                    old_version_iters,
488                    new_version_iters,
489                )));
490            } else {
491                check_result_task = None;
492                metric.num_uncheck += 1;
493            }
494            modified_compaction_groups.clear();
495            replayed_epochs.clear();
496        }
497    }
498
499    // join previously spawned check result task if any
500    if let Some(handle) = check_result_task {
501        handle.await??;
502    }
503    tracing::info!(
504        "Replay finished. Expect check count: {}, actual check count: {}",
505        metric.num_expect_check,
506        metric.num_expect_check - metric.num_uncheck
507    );
508
509    assert_ne!(0, metric.num_expect_check - metric.num_uncheck);
510
511    for (join_handle, shutdown_sender) in sub_tasks {
512        if let Err(err) = shutdown_sender.send(()) {
513            tracing::warn!("Failed to send shutdown: {:?}", err);
514            continue;
515        }
516        if let Err(err) = join_handle.await {
517            tracing::warn!("Failed to join shutdown: {:?}", err);
518        }
519    }
520
521    Ok(())
522}
523
524fn pin_old_snapshots(
525    _meta_client: &MetaClient,
526    replayed_epochs: &[HummockEpoch],
527    num: usize,
528) -> Vec<HummockEpoch> {
529    let mut old_epochs = vec![];
530    for &epoch in replayed_epochs.iter().rev().take(num) {
531        old_epochs.push(epoch);
532    }
533    old_epochs
534}
535
536/// Poll the compaction task assignment to aware whether scheduling is success.
537/// Returns (whether scheduling is success, expected number of new versions)
538async fn poll_compaction_schedule_status(
539    meta_client: &MetaClient,
540    old_task_num: usize,
541) -> (bool, i32) {
542    let poll_timeout = Duration::from_secs(2);
543    let poll_interval = Duration::from_millis(20);
544    let mut poll_duration_cnt = Duration::from_millis(0);
545    let mut new_task_num = meta_client.get_assigned_compact_task_num().await.unwrap();
546    let mut schedule_ok = false;
547    loop {
548        // New task has been scheduled
549        if new_task_num > old_task_num {
550            schedule_ok = true;
551            break;
552        }
553
554        if poll_duration_cnt >= poll_timeout {
555            break;
556        }
557        tokio::time::sleep(poll_interval).await;
558        poll_duration_cnt += poll_interval;
559        new_task_num = meta_client.get_assigned_compact_task_num().await.unwrap();
560    }
561    (
562        schedule_ok,
563        (new_task_num as i32 - old_task_num as i32).abs(),
564    )
565}
566
567async fn poll_compaction_tasks_status(
568    meta_client: &MetaClient,
569    schedule_ok: bool,
570    version_diff: u32,
571    base_version: &HummockVersion,
572) -> (bool, HummockVersion) {
573    // Polls current version to check whether its id become large,
574    // which means compaction tasks have finished. If schedule ok,
575    // we poll for a little long while.
576    let poll_timeout = if schedule_ok {
577        Duration::from_secs(120)
578    } else {
579        Duration::from_secs(5)
580    };
581    let poll_interval = Duration::from_millis(50);
582    let mut duration_cnt = Duration::from_millis(0);
583    let mut compaction_ok = false;
584
585    let mut cur_version = meta_client.get_current_version().await.unwrap();
586    loop {
587        if (cur_version.id > base_version.id)
588            && (cur_version.id - base_version.id >= version_diff as u64)
589        {
590            tracing::info!(
591                "Collected all of compact tasks. Actual version diff {}",
592                cur_version.id - base_version.id
593            );
594            compaction_ok = true;
595            break;
596        }
597        if duration_cnt >= poll_timeout {
598            break;
599        }
600        tokio::time::sleep(poll_interval).await;
601        duration_cnt += poll_interval;
602        cur_version = meta_client.get_current_version().await.unwrap();
603    }
604    (compaction_ok, cur_version)
605}
606
607type StateStoreIterType = Pin<
608    Box<
609        <<MonitoredStateStore<HummockStorage> as StateStore>::ReadSnapshot as StateStoreRead>::Iter,
610    >,
611>;
612
613async fn open_hummock_iters(
614    hummock: &MonitoredStateStore<HummockStorage>,
615    snapshots: &[HummockEpoch],
616    table_id: TableId,
617) -> anyhow::Result<BTreeMap<HummockEpoch, StateStoreIterType>> {
618    let mut results = BTreeMap::new();
619
620    // Set the `table_id` to the prefix of key, since the table_id in
621    // the `ReadOptions` will not be used to filter kv pairs
622    let mut buf = BytesMut::with_capacity(5);
623    buf.put_u32(table_id.as_raw_id());
624    let b = buf.freeze();
625    let range = (
626        Bound::Included(b.clone()).map(TableKey),
627        Bound::Excluded(Bytes::from(risingwave_hummock_sdk::key::next_key(
628            b.as_ref(),
629        )))
630        .map(TableKey),
631    );
632
633    for &epoch in snapshots {
634        let snapshot = hummock
635            .new_read_snapshot(
636                HummockReadEpoch::NoWait(epoch),
637                NewReadSnapshotOptions {
638                    table_id,
639                    table_option: TableOption::default(),
640                },
641            )
642            .await?;
643        let iter = snapshot
644            .iter(
645                range.clone(),
646                ReadOptions {
647                    cache_policy: CachePolicy::Fill(Hint::Normal),
648                    ..Default::default()
649                },
650            )
651            .await?;
652        results.insert(epoch, Box::pin(iter));
653    }
654    Ok(results)
655}
656
657pub async fn check_compaction_results(
658    version_id: HummockVersionId,
659    mut expect_results: BTreeMap<HummockEpoch, StateStoreIterType>,
660    mut actual_results: BTreeMap<HummockEpoch, StateStoreIterType>,
661) -> anyhow::Result<()> {
662    let combined = expect_results
663        .iter_mut()
664        .zip_eq_fast(actual_results.iter_mut());
665    for ((e1, expect_iter), (e2, actual_iter)) in combined {
666        assert_eq!(e1, e2);
667        tracing::info!(
668            "Check results for version: id: {}, epoch: {}",
669            version_id,
670            e1,
671        );
672        let mut expect_cnt = 0;
673        let mut actual_cnt = 0;
674
675        while let Some(kv_expect) = expect_iter.try_next().await? {
676            expect_cnt += 1;
677            let ret = actual_iter.try_next().await?;
678            match ret {
679                None => {
680                    break;
681                }
682                Some(kv_actual) => {
683                    actual_cnt += 1;
684                    assert_eq!(kv_expect.0, kv_actual.0, "Key mismatch");
685                    assert_eq!(kv_expect.1, kv_actual.1, "Value mismatch");
686                }
687            }
688        }
689        assert_eq!(expect_cnt, actual_cnt);
690    }
691    Ok(())
692}
693
694struct StorageMetrics {
695    pub hummock_metrics: Arc<HummockMetrics>,
696    pub state_store_metrics: Arc<HummockStateStoreMetrics>,
697    pub object_store_metrics: Arc<ObjectStoreMetrics>,
698    pub storage_metrics: Arc<MonitoredStorageMetrics>,
699    pub compactor_metrics: Arc<CompactorMetrics>,
700}
701
702pub async fn create_hummock_store_with_metrics(
703    meta_client: &MetaClient,
704    storage_opts: Arc<StorageOpts>,
705    opts: &CompactionTestOpts,
706) -> anyhow::Result<MonitoredStateStore<HummockStorage>> {
707    let metrics = StorageMetrics {
708        hummock_metrics: Arc::new(HummockMetrics::unused()),
709        state_store_metrics: Arc::new(HummockStateStoreMetrics::unused()),
710        object_store_metrics: Arc::new(ObjectStoreMetrics::unused()),
711        storage_metrics: Arc::new(MonitoredStorageMetrics::unused()),
712        compactor_metrics: Arc::new(CompactorMetrics::unused()),
713    };
714
715    let state_store_impl = StateStoreImpl::new(
716        &opts.state_store,
717        storage_opts,
718        Arc::new(MonitoredHummockMetaClient::new(
719            meta_client.clone(),
720            metrics.hummock_metrics.clone(),
721        )),
722        metrics.state_store_metrics.clone(),
723        metrics.object_store_metrics.clone(),
724        metrics.storage_metrics.clone(),
725        metrics.compactor_metrics.clone(),
726        None,
727        true,
728    )
729    .await?;
730
731    if let Some(hummock_state_store) = state_store_impl.as_hummock() {
732        Ok(hummock_state_store
733            .clone()
734            .monitored(metrics.storage_metrics))
735    } else {
736        Err(anyhow!("only Hummock state store is supported!"))
737    }
738}