risingwave_ctl/common/
context.rs1use risingwave_rpc_client::MetaClient;
16use risingwave_storage::hummock::HummockStorage;
17use risingwave_storage::monitor::MonitoredStateStore;
18use tokio::sync::OnceCell;
19
20use crate::common::hummock_service::{HummockServiceOpts, Metrics};
21use crate::common::meta_service::MetaServiceOpts;
22
23#[derive(Default)]
24pub struct CtlContext {
25 meta_client: OnceCell<MetaClient>,
26 hummock: OnceCell<(MonitoredStateStore<HummockStorage>, Metrics)>,
27}
28
29impl CtlContext {
30 pub async fn meta_client(&self) -> anyhow::Result<MetaClient> {
31 self.meta_client
32 .get_or_try_init(|| async {
33 let meta_opts = MetaServiceOpts::from_env()?;
34 meta_opts.create_meta_client().await
35 })
36 .await
37 .cloned()
38 }
39
40 pub async fn hummock_store(
41 &self,
42 hummock_opts: HummockServiceOpts,
43 ) -> anyhow::Result<MonitoredStateStore<HummockStorage>> {
44 let (hummock, _) = self.hummock_store_with_metrics(hummock_opts).await?;
45 Ok(hummock)
46 }
47
48 pub async fn hummock_store_with_metrics(
49 &self,
50 mut hummock_opts: HummockServiceOpts,
51 ) -> anyhow::Result<(MonitoredStateStore<HummockStorage>, Metrics)> {
52 let meta_client = self.meta_client().await?;
53 self.hummock
54 .get_or_try_init(|| async {
55 hummock_opts
56 .create_hummock_store_with_metrics(&meta_client)
57 .await
58 })
59 .await
60 .cloned()
61 }
62
63 pub async fn try_close(&self) {
64 tracing::info!("clean up context");
65 if let Some(meta_client) = self.meta_client.get() {
66 meta_client.try_unregister().await;
67 }
68 }
69}