risingwave_compaction_test/
compaction_test_runner.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16use std::net::SocketAddr;
17use std::ops::Bound;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::thread::JoinHandle;
21use std::time::Duration;
22
23use anyhow::anyhow;
24use bytes::{BufMut, Bytes, BytesMut};
25use clap::Parser;
26use foyer::Hint;
27use risingwave_common::catalog::TableId;
28use risingwave_common::config::{
29    MetaConfig, NoOverride, extract_storage_memory_config, load_config,
30};
31use risingwave_common::util::addr::HostAddr;
32use risingwave_common::util::iter_util::ZipEqFast;
33use risingwave_common::util::tokio_util::sync::CancellationToken;
34use risingwave_hummock_sdk::key::TableKey;
35use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
36use risingwave_hummock_sdk::{
37    CompactionGroupId, FIRST_VERSION_ID, HummockEpoch, HummockReadEpoch, HummockVersionId,
38};
39use risingwave_pb::common::WorkerType;
40use risingwave_rpc_client::{HummockMetaClient, MetaClient};
41use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
42use risingwave_storage::hummock::{CachePolicy, HummockStorage};
43use risingwave_storage::monitor::{
44    CompactorMetrics, HummockMetrics, HummockStateStoreMetrics, MonitoredStateStore,
45    MonitoredStorageMetrics, ObjectStoreMetrics,
46};
47use risingwave_storage::opts::StorageOpts;
48use risingwave_storage::store::{NewReadSnapshotOptions, ReadOptions, StateStoreRead};
49use risingwave_storage::{StateStore, StateStoreImpl, StateStoreIter};
50
51const SST_ID_SHIFT_COUNT: u32 = 1000000;
52const CHECKPOINT_FREQ_FOR_REPLAY: u64 = 99999999;
53
54use crate::CompactionTestOpts;
55
56struct CompactionTestMetrics {
57    num_expect_check: u64,
58    num_uncheck: u64,
59}
60
61impl CompactionTestMetrics {
62    fn new() -> CompactionTestMetrics {
63        Self {
64            num_expect_check: 0,
65            num_uncheck: 0,
66        }
67    }
68}
69
70/// Steps to use the compaction test tool
71///
72/// 1. Start the cluster with ci-compaction-test config: `./risedev d ci-compaction-test`
73/// 2. Ingest enough L0 SSTs, for example we can use the tpch-bench tool
74/// 3. Disable hummock manager commit new epochs: `./risedev ctl hummock disable-commit-epoch`, and
75///    it will print the current max committed epoch in Meta.
76/// 4. Use the test tool to replay hummock version deltas and trigger compactions:
77///    `./risedev compaction-test --state-store hummock+s3://your-bucket -t <table_id>`
78pub async fn compaction_test_main(
79    _listen_addr: SocketAddr,
80    advertise_addr: HostAddr,
81    opts: CompactionTestOpts,
82) -> anyhow::Result<()> {
83    let meta_listen_addr = opts
84        .meta_address
85        .strip_prefix("http://")
86        .unwrap()
87        .to_owned();
88
89    let _meta_handle = tokio::spawn(start_meta_node(
90        meta_listen_addr.clone(),
91        opts.state_store.clone(),
92        opts.config_path_for_meta.clone(),
93    ));
94
95    // Wait for meta starts
96    tokio::time::sleep(Duration::from_secs(1)).await;
97    tracing::info!("Started embedded Meta");
98
99    let (compactor_thrd, compactor_shutdown_tx) = start_compactor_thread(
100        opts.meta_address.clone(),
101        advertise_addr.to_string(),
102        opts.config_path.clone(),
103    );
104
105    let original_meta_endpoint = "http://127.0.0.1:5690";
106    let mut table_id: u32 = opts.table_id;
107
108    init_metadata_for_replay(
109        original_meta_endpoint,
110        &opts.meta_address,
111        &advertise_addr,
112        opts.ci_mode,
113        &mut table_id,
114    )
115    .await?;
116
117    assert_ne!(0, table_id, "Invalid table_id for correctness checking");
118
119    let version_deltas = pull_version_deltas(original_meta_endpoint, &advertise_addr).await?;
120
121    tracing::info!(
122        "Pulled delta logs from Meta: len(logs): {}",
123        version_deltas.len()
124    );
125
126    let replay_thrd = start_replay_thread(opts, table_id, version_deltas);
127    replay_thrd.join().unwrap();
128    compactor_shutdown_tx.send(()).unwrap();
129    compactor_thrd.join().unwrap();
130    Ok(())
131}
132
133pub async fn start_meta_node(listen_addr: String, state_store: String, config_path: String) {
134    let meta_opts = risingwave_meta_node::MetaNodeOpts::parse_from([
135        "meta-node",
136        "--listen-addr",
137        &listen_addr,
138        "--advertise-addr",
139        &listen_addr,
140        "--backend",
141        "mem",
142        "--state-store",
143        &state_store,
144        "--config-path",
145        &config_path,
146    ]);
147    let config = load_config(&meta_opts.config_path, &meta_opts);
148    // We set a large checkpoint frequency to prevent the embedded meta node
149    // to commit new epochs to avoid bumping the hummock version during version log replay.
150    assert_eq!(
151        CHECKPOINT_FREQ_FOR_REPLAY,
152        config.system.checkpoint_frequency.unwrap()
153    );
154    assert!(
155        config.meta.enable_compaction_deterministic,
156        "enable_compaction_deterministic should be set"
157    );
158
159    risingwave_meta_node::start(meta_opts, CancellationToken::new() /* dummy */).await
160}
161
162async fn start_compactor_node(
163    meta_rpc_endpoint: String,
164    advertise_addr: String,
165    config_path: String,
166) {
167    let opts = risingwave_compactor::CompactorOpts::parse_from([
168        "compactor-node",
169        "--listen-addr",
170        "127.0.0.1:5550",
171        "--advertise-addr",
172        &advertise_addr,
173        "--meta-address",
174        &meta_rpc_endpoint,
175        "--config-path",
176        &config_path,
177    ]);
178    risingwave_compactor::start(opts, CancellationToken::new() /* dummy */).await
179}
180
181pub fn start_compactor_thread(
182    meta_endpoint: String,
183    advertise_addr: String,
184    config_path: String,
185) -> (JoinHandle<()>, std::sync::mpsc::Sender<()>) {
186    let (tx, rx) = std::sync::mpsc::channel();
187    let compact_func = move || {
188        let runtime = tokio::runtime::Builder::new_multi_thread()
189            .enable_all()
190            .build()
191            .unwrap();
192        runtime.block_on(async {
193            tokio::spawn(async {
194                tracing::info!("Starting compactor node");
195                start_compactor_node(meta_endpoint, advertise_addr, config_path).await
196            });
197            rx.recv().unwrap();
198        });
199    };
200
201    (std::thread::spawn(compact_func), tx)
202}
203
204fn start_replay_thread(
205    opts: CompactionTestOpts,
206    table_id: u32,
207    version_deltas: Vec<HummockVersionDelta>,
208) -> JoinHandle<()> {
209    let replay_func = move || {
210        let runtime = tokio::runtime::Builder::new_current_thread()
211            .enable_all()
212            .build()
213            .unwrap();
214        runtime
215            .block_on(start_replay(opts, table_id, version_deltas))
216            .expect("repaly error occurred");
217    };
218
219    std::thread::spawn(replay_func)
220}
221
222async fn init_metadata_for_replay(
223    cluster_meta_endpoint: &str,
224    new_meta_endpoint: &str,
225    advertise_addr: &HostAddr,
226    ci_mode: bool,
227    table_id: &mut u32,
228) -> anyhow::Result<()> {
229    // The compactor needs to receive catalog notification from the new Meta node,
230    // and we should wait the compactor finishes setup the subscription channel
231    // before registering the table catalog to the new Meta node. Otherwise the
232    // filter key manager will fail to acquire a key extractor.
233    tokio::time::sleep(Duration::from_secs(2)).await;
234
235    let meta_config = MetaConfig::default();
236    let meta_client: MetaClient;
237    tokio::select! {
238        _ = tokio::signal::ctrl_c() => {
239            tracing::info!("Ctrl+C received, now exiting");
240            std::process::exit(0);
241        },
242        ret = MetaClient::register_new(cluster_meta_endpoint.parse()?, WorkerType::RiseCtl, advertise_addr, Default::default(), Arc::new(meta_config.clone())) => {
243            (meta_client, _) = ret;
244        },
245    }
246    let worker_id = meta_client.worker_id();
247    tracing::info!("Assigned init worker id {}", worker_id);
248    meta_client.activate(advertise_addr).await.unwrap();
249
250    let tables = meta_client.risectl_list_state_tables().await?;
251
252    let (new_meta_client, _) = MetaClient::register_new(
253        new_meta_endpoint.parse()?,
254        WorkerType::RiseCtl,
255        advertise_addr,
256        Default::default(),
257        Arc::new(meta_config.clone()),
258    )
259    .await;
260    new_meta_client.activate(advertise_addr).await.unwrap();
261    if ci_mode {
262        let table_to_check = tables.iter().find(|t| t.name == "nexmark_q7").unwrap();
263        *table_id = table_to_check.id;
264    }
265
266    // No need to init compaction_groups, because it will be done when replaying version delta.
267    new_meta_client
268        .init_metadata_for_replay(tables, vec![])
269        .await?;
270
271    // shift the sst id to avoid conflict with the original meta node
272    let _ = new_meta_client
273        .get_new_object_ids(SST_ID_SHIFT_COUNT)
274        .await?;
275
276    tracing::info!("Finished initializing the new Meta");
277    Ok(())
278}
279
280async fn pull_version_deltas(
281    cluster_meta_endpoint: &str,
282    advertise_addr: &HostAddr,
283) -> anyhow::Result<Vec<HummockVersionDelta>> {
284    // Register to the cluster.
285    // We reuse the RiseCtl worker type here
286    let (meta_client, _) = MetaClient::register_new(
287        cluster_meta_endpoint.parse()?,
288        WorkerType::RiseCtl,
289        advertise_addr,
290        Default::default(),
291        Arc::new(MetaConfig::default()),
292    )
293    .await;
294    let worker_id = meta_client.worker_id();
295    tracing::info!("Assigned pull worker id {}", worker_id);
296    meta_client.activate(advertise_addr).await.unwrap();
297
298    let (handle, shutdown_tx) =
299        MetaClient::start_heartbeat_loop(meta_client.clone(), Duration::from_millis(1000));
300    let res = meta_client
301        .list_version_deltas(HummockVersionId::new(0), u32::MAX, u64::MAX)
302        .await
303        .unwrap();
304
305    if let Err(err) = shutdown_tx.send(()) {
306        tracing::warn!("Failed to send shutdown to heartbeat task: {:?}", err);
307    }
308    handle.await?;
309    tracing::info!("Shutdown the pull worker");
310    Ok(res)
311}
312
313async fn start_replay(
314    opts: CompactionTestOpts,
315    table_to_check: u32,
316    version_delta_logs: Vec<HummockVersionDelta>,
317) -> anyhow::Result<()> {
318    let advertise_addr = "127.0.0.1:7770".parse().unwrap();
319    tracing::info!(
320        "Start to replay. Advertise address is {}, Table id {}",
321        advertise_addr,
322        table_to_check
323    );
324
325    let mut metric = CompactionTestMetrics::new();
326    let config = load_config(&opts.config_path_for_meta, NoOverride);
327    tracing::info!(
328        "Starting replay with config {:?} and opts {:?}",
329        config,
330        opts
331    );
332
333    // Register to the cluster.
334    // We reuse the RiseCtl worker type here
335    let (meta_client, system_params) = MetaClient::register_new(
336        opts.meta_address.parse()?,
337        WorkerType::RiseCtl,
338        &advertise_addr,
339        Default::default(),
340        Arc::new(config.meta.clone()),
341    )
342    .await;
343    let worker_id = meta_client.worker_id();
344    tracing::info!("Assigned replay worker id {}", worker_id);
345    meta_client.activate(&advertise_addr).await.unwrap();
346
347    let sub_tasks = vec![MetaClient::start_heartbeat_loop(
348        meta_client.clone(),
349        Duration::from_millis(1000),
350    )];
351
352    // Prevent the embedded meta to commit new epochs during version replay
353    let latest_version = meta_client.disable_commit_epoch().await?;
354    assert_eq!(FIRST_VERSION_ID, latest_version.id);
355    // The new meta should not have any data at this time
356    for level in latest_version.levels.values() {
357        level.levels.iter().for_each(|lvl| {
358            assert!(lvl.table_infos.is_empty());
359            assert_eq!(0, lvl.total_file_size);
360        });
361    }
362
363    // Creates a hummock state store *after* we reset the hummock version
364    let storage_memory_config = extract_storage_memory_config(&config);
365    let storage_opts = Arc::new(StorageOpts::from((
366        &config,
367        &system_params,
368        &storage_memory_config,
369    )));
370    let hummock = create_hummock_store_with_metrics(&meta_client, storage_opts, &opts).await?;
371
372    // Replay version deltas from FIRST_VERSION_ID to the version before reset
373    let mut modified_compaction_groups = HashSet::<CompactionGroupId>::new();
374    let mut replay_count: u64 = 0;
375    let mut replayed_epochs = vec![];
376    let mut check_result_task: Option<tokio::task::JoinHandle<_>> = None;
377
378    for delta in version_delta_logs {
379        let (current_version, compaction_groups) = meta_client.replay_version_delta(delta).await?;
380        let (version_id, committed_epoch) = (
381            current_version.id,
382            current_version
383                .table_committed_epoch(table_to_check.into())
384                .unwrap_or_default(),
385        );
386        tracing::info!(
387            "Replayed version delta version_id: {}, committed_epoch: {}, compaction_groups: {:?}",
388            version_id,
389            committed_epoch,
390            compaction_groups
391        );
392
393        hummock
394            .inner()
395            .update_version_and_wait(current_version.clone())
396            .await;
397
398        replay_count += 1;
399        replayed_epochs.push(committed_epoch);
400        compaction_groups
401            .into_iter()
402            .map(|c| modified_compaction_groups.insert(c))
403            .count();
404
405        // We can custom more conditions for compaction triggering
406        // For now I just use a static way here
407        if replay_count.is_multiple_of(opts.num_trigger_frequency)
408            && !modified_compaction_groups.is_empty()
409        {
410            // join previously spawned check result task
411            if let Some(handle) = check_result_task {
412                handle.await??;
413            }
414
415            metric.num_expect_check += 1;
416
417            // pop the latest epoch
418            replayed_epochs.pop();
419            let mut epochs = vec![committed_epoch];
420            epochs.extend(pin_old_snapshots(&meta_client, &replayed_epochs, 1).into_iter());
421            tracing::info!("===== Prepare to check snapshots: {:?}", epochs);
422
423            let old_version_iters = open_hummock_iters(&hummock, &epochs, table_to_check).await?;
424
425            tracing::info!(
426                "Trigger compaction for version {}, epoch {} compaction_groups: {:?}",
427                version_id,
428                committed_epoch,
429                modified_compaction_groups,
430            );
431            // Try trigger multiple rounds of compactions but doesn't wait for finish
432            let is_multi_round = opts.num_trigger_rounds > 1;
433            for _ in 0..opts.num_trigger_rounds {
434                meta_client
435                    .trigger_compaction_deterministic(
436                        version_id,
437                        Vec::from_iter(modified_compaction_groups.iter().copied()),
438                    )
439                    .await?;
440                if is_multi_round {
441                    tokio::time::sleep(Duration::from_millis(50)).await;
442                }
443            }
444
445            let old_task_num = meta_client.get_assigned_compact_task_num().await?;
446            // Poll for compaction task status
447            let (schedule_ok, version_diff) =
448                poll_compaction_schedule_status(&meta_client, old_task_num).await;
449
450            tracing::info!(
451                "Compaction schedule_ok {}, version_diff {}",
452                schedule_ok,
453                version_diff,
454            );
455            let (compaction_ok, new_version) = poll_compaction_tasks_status(
456                &meta_client,
457                schedule_ok,
458                version_diff as u32,
459                &current_version,
460            )
461            .await;
462
463            tracing::info!(
464                "Compaction schedule_ok {}, version_diff {} compaction_ok {}",
465                schedule_ok,
466                version_diff,
467                compaction_ok,
468            );
469
470            let new_version_id = new_version.id;
471            assert!(
472                new_version_id >= version_id,
473                "new_version_id: {}",
474                new_version_id,
475            );
476
477            if new_version_id != version_id {
478                hummock.inner().update_version_and_wait(new_version).await;
479
480                let new_version_iters =
481                    open_hummock_iters(&hummock, &epochs, table_to_check).await?;
482
483                // spawn a task to check the results
484                check_result_task = Some(tokio::spawn(check_compaction_results(
485                    new_version_id,
486                    old_version_iters,
487                    new_version_iters,
488                )));
489            } else {
490                check_result_task = None;
491                metric.num_uncheck += 1;
492            }
493            modified_compaction_groups.clear();
494            replayed_epochs.clear();
495        }
496    }
497
498    // join previously spawned check result task if any
499    if let Some(handle) = check_result_task {
500        handle.await??;
501    }
502    tracing::info!(
503        "Replay finished. Expect check count: {}, actual check count: {}",
504        metric.num_expect_check,
505        metric.num_expect_check - metric.num_uncheck
506    );
507
508    assert_ne!(0, metric.num_expect_check - metric.num_uncheck);
509
510    for (join_handle, shutdown_sender) in sub_tasks {
511        if let Err(err) = shutdown_sender.send(()) {
512            tracing::warn!("Failed to send shutdown: {:?}", err);
513            continue;
514        }
515        if let Err(err) = join_handle.await {
516            tracing::warn!("Failed to join shutdown: {:?}", err);
517        }
518    }
519
520    Ok(())
521}
522
523fn pin_old_snapshots(
524    _meta_client: &MetaClient,
525    replayed_epochs: &[HummockEpoch],
526    num: usize,
527) -> Vec<HummockEpoch> {
528    let mut old_epochs = vec![];
529    for &epoch in replayed_epochs.iter().rev().take(num) {
530        old_epochs.push(epoch);
531    }
532    old_epochs
533}
534
535/// Poll the compaction task assignment to aware whether scheduling is success.
536/// Returns (whether scheduling is success, expected number of new versions)
537async fn poll_compaction_schedule_status(
538    meta_client: &MetaClient,
539    old_task_num: usize,
540) -> (bool, i32) {
541    let poll_timeout = Duration::from_secs(2);
542    let poll_interval = Duration::from_millis(20);
543    let mut poll_duration_cnt = Duration::from_millis(0);
544    let mut new_task_num = meta_client.get_assigned_compact_task_num().await.unwrap();
545    let mut schedule_ok = false;
546    loop {
547        // New task has been scheduled
548        if new_task_num > old_task_num {
549            schedule_ok = true;
550            break;
551        }
552
553        if poll_duration_cnt >= poll_timeout {
554            break;
555        }
556        tokio::time::sleep(poll_interval).await;
557        poll_duration_cnt += poll_interval;
558        new_task_num = meta_client.get_assigned_compact_task_num().await.unwrap();
559    }
560    (
561        schedule_ok,
562        (new_task_num as i32 - old_task_num as i32).abs(),
563    )
564}
565
566async fn poll_compaction_tasks_status(
567    meta_client: &MetaClient,
568    schedule_ok: bool,
569    version_diff: u32,
570    base_version: &HummockVersion,
571) -> (bool, HummockVersion) {
572    // Polls current version to check whether its id become large,
573    // which means compaction tasks have finished. If schedule ok,
574    // we poll for a little long while.
575    let poll_timeout = if schedule_ok {
576        Duration::from_secs(120)
577    } else {
578        Duration::from_secs(5)
579    };
580    let poll_interval = Duration::from_millis(50);
581    let mut duration_cnt = Duration::from_millis(0);
582    let mut compaction_ok = false;
583
584    let mut cur_version = meta_client.get_current_version().await.unwrap();
585    loop {
586        if (cur_version.id > base_version.id)
587            && (cur_version.id - base_version.id >= version_diff as u64)
588        {
589            tracing::info!(
590                "Collected all of compact tasks. Actual version diff {}",
591                cur_version.id - base_version.id
592            );
593            compaction_ok = true;
594            break;
595        }
596        if duration_cnt >= poll_timeout {
597            break;
598        }
599        tokio::time::sleep(poll_interval).await;
600        duration_cnt += poll_interval;
601        cur_version = meta_client.get_current_version().await.unwrap();
602    }
603    (compaction_ok, cur_version)
604}
605
606type StateStoreIterType = Pin<
607    Box<
608        <<MonitoredStateStore<HummockStorage> as StateStore>::ReadSnapshot as StateStoreRead>::Iter,
609    >,
610>;
611
612async fn open_hummock_iters(
613    hummock: &MonitoredStateStore<HummockStorage>,
614    snapshots: &[HummockEpoch],
615    table_id: u32,
616) -> anyhow::Result<BTreeMap<HummockEpoch, StateStoreIterType>> {
617    let mut results = BTreeMap::new();
618
619    // Set the `table_id` to the prefix of key, since the table_id in
620    // the `ReadOptions` will not be used to filter kv pairs
621    let mut buf = BytesMut::with_capacity(5);
622    buf.put_u32(table_id);
623    let b = buf.freeze();
624    let range = (
625        Bound::Included(b.clone()).map(TableKey),
626        Bound::Excluded(Bytes::from(risingwave_hummock_sdk::key::next_key(
627            b.as_ref(),
628        )))
629        .map(TableKey),
630    );
631
632    for &epoch in snapshots {
633        let snapshot = hummock
634            .new_read_snapshot(
635                HummockReadEpoch::NoWait(epoch),
636                NewReadSnapshotOptions {
637                    table_id: TableId { table_id },
638                },
639            )
640            .await?;
641        let iter = snapshot
642            .iter(
643                range.clone(),
644                ReadOptions {
645                    cache_policy: CachePolicy::Fill(Hint::Normal),
646                    ..Default::default()
647                },
648            )
649            .await?;
650        results.insert(epoch, Box::pin(iter));
651    }
652    Ok(results)
653}
654
655pub async fn check_compaction_results(
656    version_id: HummockVersionId,
657    mut expect_results: BTreeMap<HummockEpoch, StateStoreIterType>,
658    mut actual_results: BTreeMap<HummockEpoch, StateStoreIterType>,
659) -> anyhow::Result<()> {
660    let combined = expect_results
661        .iter_mut()
662        .zip_eq_fast(actual_results.iter_mut());
663    for ((e1, expect_iter), (e2, actual_iter)) in combined {
664        assert_eq!(e1, e2);
665        tracing::info!(
666            "Check results for version: id: {}, epoch: {}",
667            version_id,
668            e1,
669        );
670        let mut expect_cnt = 0;
671        let mut actual_cnt = 0;
672
673        while let Some(kv_expect) = expect_iter.try_next().await? {
674            expect_cnt += 1;
675            let ret = actual_iter.try_next().await?;
676            match ret {
677                None => {
678                    break;
679                }
680                Some(kv_actual) => {
681                    actual_cnt += 1;
682                    assert_eq!(kv_expect.0, kv_actual.0, "Key mismatch");
683                    assert_eq!(kv_expect.1, kv_actual.1, "Value mismatch");
684                }
685            }
686        }
687        assert_eq!(expect_cnt, actual_cnt);
688    }
689    Ok(())
690}
691
692struct StorageMetrics {
693    pub hummock_metrics: Arc<HummockMetrics>,
694    pub state_store_metrics: Arc<HummockStateStoreMetrics>,
695    pub object_store_metrics: Arc<ObjectStoreMetrics>,
696    pub storage_metrics: Arc<MonitoredStorageMetrics>,
697    pub compactor_metrics: Arc<CompactorMetrics>,
698}
699
700pub async fn create_hummock_store_with_metrics(
701    meta_client: &MetaClient,
702    storage_opts: Arc<StorageOpts>,
703    opts: &CompactionTestOpts,
704) -> anyhow::Result<MonitoredStateStore<HummockStorage>> {
705    let metrics = StorageMetrics {
706        hummock_metrics: Arc::new(HummockMetrics::unused()),
707        state_store_metrics: Arc::new(HummockStateStoreMetrics::unused()),
708        object_store_metrics: Arc::new(ObjectStoreMetrics::unused()),
709        storage_metrics: Arc::new(MonitoredStorageMetrics::unused()),
710        compactor_metrics: Arc::new(CompactorMetrics::unused()),
711    };
712
713    let state_store_impl = StateStoreImpl::new(
714        &opts.state_store,
715        storage_opts,
716        Arc::new(MonitoredHummockMetaClient::new(
717            meta_client.clone(),
718            metrics.hummock_metrics.clone(),
719        )),
720        metrics.state_store_metrics.clone(),
721        metrics.object_store_metrics.clone(),
722        metrics.storage_metrics.clone(),
723        metrics.compactor_metrics.clone(),
724        None,
725        true,
726    )
727    .await?;
728
729    if let Some(hummock_state_store) = state_store_impl.as_hummock() {
730        Ok(hummock_state_store
731            .clone()
732            .monitored(metrics.storage_metrics))
733    } else {
734        Err(anyhow!("only Hummock state store is supported!"))
735    }
736}