risingwave_ctl/common/
hummock_service.rs1use std::env;
16use std::sync::Arc;
17use std::time::Duration;
18
19use anyhow::{Result, anyhow, bail};
20use foyer::{Engine, HybridCacheBuilder};
21use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
22use risingwave_object_store::object::build_remote_object_store;
23use risingwave_rpc_client::MetaClient;
24use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
25use risingwave_storage::hummock::{HummockStorage, SstableStore, SstableStoreConfig};
26use risingwave_storage::monitor::{
27 CompactorMetrics, HummockMetrics, HummockStateStoreMetrics, MonitoredStateStore,
28 MonitoredStorageMetrics, ObjectStoreMetrics, global_hummock_state_store_metrics,
29};
30use risingwave_storage::opts::StorageOpts;
31use risingwave_storage::{StateStore, StateStoreImpl};
32use tokio::sync::oneshot::Sender;
33use tokio::task::JoinHandle;
34
35pub struct HummockServiceOpts {
36 pub hummock_url: String,
37 pub data_dir: Option<String>,
38
39 use_new_object_prefix_strategy: bool,
40
41 heartbeat_handle: Option<JoinHandle<()>>,
42 heartbeat_shutdown_sender: Option<Sender<()>>,
43}
44
45#[derive(Clone)]
46pub struct Metrics {
47 pub hummock_metrics: Arc<HummockMetrics>,
48 pub state_store_metrics: Arc<HummockStateStoreMetrics>,
49 pub object_store_metrics: Arc<ObjectStoreMetrics>,
50 pub storage_metrics: Arc<MonitoredStorageMetrics>,
51 pub compactor_metrics: Arc<CompactorMetrics>,
52}
53
54impl HummockServiceOpts {
55 pub fn from_env(
61 data_dir: Option<String>,
62 use_new_object_prefix_strategy: bool,
63 ) -> Result<Self> {
64 let hummock_url = match env::var("RW_HUMMOCK_URL") {
65 Ok(url) => {
66 if !url.starts_with("hummock+") {
67 return Err(anyhow!(
68 "only url starting with 'hummock+' is supported in risectl"
69 ));
70 }
71 tracing::info!("using Hummock URL from `RW_HUMMOCK_URL`: {}", url);
72 url
73 }
74 Err(_) => {
75 const MESSAGE: &str = "env variable `RW_HUMMOCK_URL` not found.
76 For `./risedev d` use cases, please do the following.
77 * start the cluster with shared storage:
78 - consider adding `use: minio` in the risedev config,
79 - or directly use `./risedev d for-ctl` to start the cluster.
80 * use `./risedev ctl` to use risectl.
81
82 For `./risedev apply-compose-deploy` users,
83 * `RW_HUMMOCK_URL` will be printed out when deploying. Please copy the bash exports to your console.
84 ";
85 bail!(MESSAGE);
86 }
87 };
88
89 Ok(Self {
90 hummock_url,
91 data_dir,
92 heartbeat_handle: None,
93 heartbeat_shutdown_sender: None,
94 use_new_object_prefix_strategy,
95 })
96 }
97
98 fn get_storage_opts(&self) -> StorageOpts {
99 let mut opts = StorageOpts {
100 share_buffer_compaction_worker_threads_number: 0,
101 meta_cache_capacity_mb: 1,
102 block_cache_capacity_mb: 1,
103 meta_cache_shard_num: 1,
104 block_cache_shard_num: 1,
105 ..Default::default()
106 };
107
108 if let Some(dir) = &self.data_dir {
109 opts.data_directory.clone_from(dir);
110 }
111
112 opts
113 }
114
115 pub async fn create_hummock_store_with_metrics(
116 &mut self,
117 meta_client: &MetaClient,
118 ) -> Result<(MonitoredStateStore<HummockStorage>, Metrics)> {
119 let (heartbeat_handle, heartbeat_shutdown_sender) =
120 MetaClient::start_heartbeat_loop(meta_client.clone(), Duration::from_millis(1000));
121 self.heartbeat_handle = Some(heartbeat_handle);
122 self.heartbeat_shutdown_sender = Some(heartbeat_shutdown_sender);
123
124 let opts = self.get_storage_opts();
126
127 tracing::info!("using StorageOpts: {:#?}", opts);
128
129 let metrics = Metrics {
130 hummock_metrics: Arc::new(HummockMetrics::unused()),
131 state_store_metrics: Arc::new(HummockStateStoreMetrics::unused()),
132 object_store_metrics: Arc::new(ObjectStoreMetrics::unused()),
133 storage_metrics: Arc::new(MonitoredStorageMetrics::unused()),
134 compactor_metrics: Arc::new(CompactorMetrics::unused()),
135 };
136
137 let state_store_impl = StateStoreImpl::new(
138 &self.hummock_url,
139 Arc::new(opts),
140 Arc::new(MonitoredHummockMetaClient::new(
141 meta_client.clone(),
142 metrics.hummock_metrics.clone(),
143 )),
144 metrics.state_store_metrics.clone(),
145 metrics.object_store_metrics.clone(),
146 metrics.storage_metrics.clone(),
147 metrics.compactor_metrics.clone(),
148 None,
149 self.use_new_object_prefix_strategy,
150 )
151 .await?;
152
153 if let Some(hummock_state_store) = state_store_impl.as_hummock() {
154 Ok((
155 hummock_state_store
156 .clone()
157 .monitored(metrics.storage_metrics.clone()),
158 metrics,
159 ))
160 } else {
161 Err(anyhow!("only Hummock state store is supported in risectl"))
162 }
163 }
164
165 pub async fn create_sstable_store(
166 &self,
167 use_new_object_prefix_strategy: bool,
168 ) -> Result<Arc<SstableStore>> {
169 let object_store = build_remote_object_store(
170 self.hummock_url.strip_prefix("hummock+").unwrap(),
171 Arc::new(ObjectStoreMetrics::unused()),
172 "Hummock",
173 Arc::new(ObjectStoreConfig::default()),
174 )
175 .await;
176
177 let opts = self.get_storage_opts();
178
179 let meta_cache = HybridCacheBuilder::new()
180 .memory(opts.meta_cache_capacity_mb * (1 << 20))
181 .with_shards(opts.meta_cache_shard_num)
182 .storage(Engine::Large)
183 .build()
184 .await?;
185 let block_cache = HybridCacheBuilder::new()
186 .memory(opts.block_cache_capacity_mb * (1 << 20))
187 .with_shards(opts.block_cache_shard_num)
188 .storage(Engine::Large)
189 .build()
190 .await?;
191
192 Ok(Arc::new(SstableStore::new(SstableStoreConfig {
193 store: Arc::new(object_store),
194 path: opts.data_directory,
195 prefetch_buffer_capacity: opts.block_cache_capacity_mb * (1 << 20),
196 max_prefetch_block_number: opts.max_prefetch_block_number,
197 recent_filter: None,
198 state_store_metrics: Arc::new(global_hummock_state_store_metrics(
199 MetricLevel::Disabled,
200 )),
201 use_new_object_prefix_strategy,
202 meta_cache,
203 block_cache,
204 })))
205 }
206}