risingwave_ctl/common/
context.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_rpc_client::MetaClient;
16use risingwave_storage::hummock::HummockStorage;
17use risingwave_storage::monitor::MonitoredStateStore;
18use tokio::sync::OnceCell;
19
20use crate::common::hummock_service::{HummockServiceOpts, Metrics};
21use crate::common::meta_service::MetaServiceOpts;
22
23#[derive(Default)]
24pub struct CtlContext {
25    meta_client: OnceCell<MetaClient>,
26    hummock: OnceCell<(MonitoredStateStore<HummockStorage>, Metrics)>,
27}
28
29impl CtlContext {
30    pub async fn meta_client(&self) -> anyhow::Result<MetaClient> {
31        self.meta_client
32            .get_or_try_init(|| async {
33                let meta_opts = MetaServiceOpts::from_env()?;
34                meta_opts.create_meta_client().await
35            })
36            .await
37            .cloned()
38    }
39
40    pub async fn hummock_store(
41        &self,
42        hummock_opts: HummockServiceOpts,
43    ) -> anyhow::Result<MonitoredStateStore<HummockStorage>> {
44        let (hummock, _) = self.hummock_store_with_metrics(hummock_opts).await?;
45        Ok(hummock)
46    }
47
48    pub async fn hummock_store_with_metrics(
49        &self,
50        mut hummock_opts: HummockServiceOpts,
51    ) -> anyhow::Result<(MonitoredStateStore<HummockStorage>, Metrics)> {
52        let meta_client = self.meta_client().await?;
53        self.hummock
54            .get_or_try_init(|| async {
55                hummock_opts
56                    .create_hummock_store_with_metrics(&meta_client)
57                    .await
58            })
59            .await
60            .cloned()
61    }
62
63    pub async fn try_close(&self) {
64        tracing::info!("clean up context");
65        if let Some(meta_client) = self.meta_client.get() {
66            meta_client.try_unregister().await;
67        }
68    }
69}