risingwave_ctl/common/
hummock_service.rs1use std::env;
16use std::sync::Arc;
17use std::time::Duration;
18
19use anyhow::{Result, anyhow, bail};
20use foyer::{CacheBuilder, HybridCacheBuilder};
21use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
22use risingwave_object_store::object::build_remote_object_store;
23use risingwave_rpc_client::MetaClient;
24use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
25use risingwave_storage::hummock::none::NoneRecentFilter;
26use risingwave_storage::hummock::{HummockStorage, SstableStore, SstableStoreConfig};
27use risingwave_storage::monitor::{
28 CompactorMetrics, HummockMetrics, HummockStateStoreMetrics, MonitoredStateStore,
29 MonitoredStorageMetrics, ObjectStoreMetrics, global_hummock_state_store_metrics,
30};
31use risingwave_storage::opts::StorageOpts;
32use risingwave_storage::{StateStore, StateStoreImpl};
33use tokio::sync::oneshot::Sender;
34use tokio::task::JoinHandle;
35
36pub struct HummockServiceOpts {
37 pub hummock_url: String,
38 pub data_dir: Option<String>,
39
40 use_new_object_prefix_strategy: bool,
41
42 heartbeat_handle: Option<JoinHandle<()>>,
43 heartbeat_shutdown_sender: Option<Sender<()>>,
44}
45
46#[derive(Clone)]
47pub struct Metrics {
48 pub hummock_metrics: Arc<HummockMetrics>,
49 pub state_store_metrics: Arc<HummockStateStoreMetrics>,
50 pub object_store_metrics: Arc<ObjectStoreMetrics>,
51 pub storage_metrics: Arc<MonitoredStorageMetrics>,
52 pub compactor_metrics: Arc<CompactorMetrics>,
53}
54
55impl HummockServiceOpts {
56 pub fn from_env(
62 data_dir: Option<String>,
63 use_new_object_prefix_strategy: bool,
64 ) -> Result<Self> {
65 let hummock_url = match env::var("RW_HUMMOCK_URL") {
66 Ok(url) => {
67 if !url.starts_with("hummock+") {
68 return Err(anyhow!(
69 "only url starting with 'hummock+' is supported in risectl"
70 ));
71 }
72 tracing::info!("using Hummock URL from `RW_HUMMOCK_URL`: {}", url);
73 url
74 }
75 Err(_) => {
76 const MESSAGE: &str = "env variable `RW_HUMMOCK_URL` not found.
77 For `./risedev d` use cases, please do the following.
78 * start the cluster with shared storage:
79 - consider adding `use: minio` in the risedev config,
80 - or directly use `./risedev d for-ctl` to start the cluster.
81 * use `./risedev ctl` to use risectl.
82
83 For production use cases,
84 * please set `RW_HUMMOCK_URL` to the same address specified for the meta node.
85 ";
86 bail!(MESSAGE);
87 }
88 };
89
90 Ok(Self {
91 hummock_url,
92 data_dir,
93 heartbeat_handle: None,
94 heartbeat_shutdown_sender: None,
95 use_new_object_prefix_strategy,
96 })
97 }
98
99 fn get_storage_opts(&self) -> StorageOpts {
100 let mut opts = StorageOpts {
101 share_buffer_compaction_worker_threads_number: 0,
102 meta_cache_capacity_mb: 1,
103 block_cache_capacity_mb: 1,
104 meta_cache_shard_num: 1,
105 block_cache_shard_num: 1,
106 ..Default::default()
107 };
108
109 if let Some(dir) = &self.data_dir {
110 opts.data_directory.clone_from(dir);
111 }
112
113 opts
114 }
115
116 pub async fn create_hummock_store_with_metrics(
117 &mut self,
118 meta_client: &MetaClient,
119 ) -> Result<(MonitoredStateStore<HummockStorage>, Metrics)> {
120 let (heartbeat_handle, heartbeat_shutdown_sender) =
121 MetaClient::start_heartbeat_loop(meta_client.clone(), Duration::from_millis(1000));
122 self.heartbeat_handle = Some(heartbeat_handle);
123 self.heartbeat_shutdown_sender = Some(heartbeat_shutdown_sender);
124
125 let opts = self.get_storage_opts();
127
128 tracing::info!("using StorageOpts: {:#?}", opts);
129
130 let metrics = Metrics {
131 hummock_metrics: Arc::new(HummockMetrics::unused()),
132 state_store_metrics: Arc::new(HummockStateStoreMetrics::unused()),
133 object_store_metrics: Arc::new(ObjectStoreMetrics::unused()),
134 storage_metrics: Arc::new(MonitoredStorageMetrics::unused()),
135 compactor_metrics: Arc::new(CompactorMetrics::unused()),
136 };
137
138 let state_store_impl = StateStoreImpl::new(
139 &self.hummock_url,
140 Arc::new(opts),
141 Arc::new(MonitoredHummockMetaClient::new(
142 meta_client.clone(),
143 metrics.hummock_metrics.clone(),
144 )),
145 metrics.state_store_metrics.clone(),
146 metrics.object_store_metrics.clone(),
147 metrics.storage_metrics.clone(),
148 metrics.compactor_metrics.clone(),
149 None,
150 self.use_new_object_prefix_strategy,
151 )
152 .await?;
153
154 if let Some(hummock_state_store) = state_store_impl.as_hummock() {
155 Ok((
156 hummock_state_store
157 .clone()
158 .monitored(metrics.storage_metrics.clone()),
159 metrics,
160 ))
161 } else {
162 Err(anyhow!("only Hummock state store is supported in risectl"))
163 }
164 }
165
166 pub async fn create_sstable_store(
167 &self,
168 use_new_object_prefix_strategy: bool,
169 ) -> Result<Arc<SstableStore>> {
170 let object_store = build_remote_object_store(
171 self.hummock_url.strip_prefix("hummock+").unwrap(),
172 Arc::new(ObjectStoreMetrics::unused()),
173 "Hummock",
174 Arc::new(ObjectStoreConfig::default()),
175 )
176 .await;
177
178 let opts = self.get_storage_opts();
179
180 let meta_cache = HybridCacheBuilder::new()
181 .memory(opts.meta_cache_capacity_mb * (1 << 20))
182 .with_shards(opts.meta_cache_shard_num)
183 .storage()
184 .build()
185 .await?;
186 let block_cache = HybridCacheBuilder::new()
187 .memory(opts.block_cache_capacity_mb * (1 << 20))
188 .with_shards(opts.block_cache_shard_num)
189 .storage()
190 .build()
191 .await?;
192
193 Ok(Arc::new(SstableStore::new(SstableStoreConfig {
194 store: Arc::new(object_store),
195 path: opts.data_directory,
196 prefetch_buffer_capacity: opts.block_cache_capacity_mb * (1 << 20),
197 max_prefetch_block_number: opts.max_prefetch_block_number,
198 recent_filter: Arc::new(NoneRecentFilter::default().into()),
199 state_store_metrics: Arc::new(global_hummock_state_store_metrics(
200 MetricLevel::Disabled,
201 )),
202 use_new_object_prefix_strategy,
203 meta_cache,
204 block_cache,
205 vector_meta_cache: CacheBuilder::new(1 << 10).build(),
206 vector_block_cache: CacheBuilder::new(1 << 10).build(),
207 })))
208 }
209}