risingwave_ctl/common/
hummock_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::env;
16use std::sync::Arc;
17use std::time::Duration;
18
19use anyhow::{Result, anyhow, bail};
20use foyer::{Engine, HybridCacheBuilder};
21use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
22use risingwave_object_store::object::build_remote_object_store;
23use risingwave_rpc_client::MetaClient;
24use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
25use risingwave_storage::hummock::{HummockStorage, SstableStore, SstableStoreConfig};
26use risingwave_storage::monitor::{
27    CompactorMetrics, HummockMetrics, HummockStateStoreMetrics, MonitoredStateStore,
28    MonitoredStorageMetrics, ObjectStoreMetrics, global_hummock_state_store_metrics,
29};
30use risingwave_storage::opts::StorageOpts;
31use risingwave_storage::{StateStore, StateStoreImpl};
32use tokio::sync::oneshot::Sender;
33use tokio::task::JoinHandle;
34
35pub struct HummockServiceOpts {
36    pub hummock_url: String,
37    pub data_dir: Option<String>,
38
39    use_new_object_prefix_strategy: bool,
40
41    heartbeat_handle: Option<JoinHandle<()>>,
42    heartbeat_shutdown_sender: Option<Sender<()>>,
43}
44
45#[derive(Clone)]
46pub struct Metrics {
47    pub hummock_metrics: Arc<HummockMetrics>,
48    pub state_store_metrics: Arc<HummockStateStoreMetrics>,
49    pub object_store_metrics: Arc<ObjectStoreMetrics>,
50    pub storage_metrics: Arc<MonitoredStorageMetrics>,
51    pub compactor_metrics: Arc<CompactorMetrics>,
52}
53
54impl HummockServiceOpts {
55    /// Recover hummock service options from env variable
56    ///
57    /// Currently, we will read these variables for meta:
58    ///
59    /// * `RW_HUMMOCK_URL`: hummock store address
60    pub fn from_env(
61        data_dir: Option<String>,
62        use_new_object_prefix_strategy: bool,
63    ) -> Result<Self> {
64        let hummock_url = match env::var("RW_HUMMOCK_URL") {
65            Ok(url) => {
66                if !url.starts_with("hummock+") {
67                    return Err(anyhow!(
68                        "only url starting with 'hummock+' is supported in risectl"
69                    ));
70                }
71                tracing::info!("using Hummock URL from `RW_HUMMOCK_URL`: {}", url);
72                url
73            }
74            Err(_) => {
75                const MESSAGE: &str = "env variable `RW_HUMMOCK_URL` not found.
76                    For `./risedev d` use cases, please do the following.
77                    * start the cluster with shared storage:
78                    - consider adding `use: minio` in the risedev config,
79                    - or directly use `./risedev d for-ctl` to start the cluster.
80                    * use `./risedev ctl` to use risectl.
81
82                    For `./risedev apply-compose-deploy` users,
83                    * `RW_HUMMOCK_URL` will be printed out when deploying. Please copy the bash exports to your console.
84                ";
85                bail!(MESSAGE);
86            }
87        };
88
89        Ok(Self {
90            hummock_url,
91            data_dir,
92            heartbeat_handle: None,
93            heartbeat_shutdown_sender: None,
94            use_new_object_prefix_strategy,
95        })
96    }
97
98    fn get_storage_opts(&self) -> StorageOpts {
99        let mut opts = StorageOpts {
100            share_buffer_compaction_worker_threads_number: 0,
101            meta_cache_capacity_mb: 1,
102            block_cache_capacity_mb: 1,
103            meta_cache_shard_num: 1,
104            block_cache_shard_num: 1,
105            ..Default::default()
106        };
107
108        if let Some(dir) = &self.data_dir {
109            opts.data_directory.clone_from(dir);
110        }
111
112        opts
113    }
114
115    pub async fn create_hummock_store_with_metrics(
116        &mut self,
117        meta_client: &MetaClient,
118    ) -> Result<(MonitoredStateStore<HummockStorage>, Metrics)> {
119        let (heartbeat_handle, heartbeat_shutdown_sender) =
120            MetaClient::start_heartbeat_loop(meta_client.clone(), Duration::from_millis(1000));
121        self.heartbeat_handle = Some(heartbeat_handle);
122        self.heartbeat_shutdown_sender = Some(heartbeat_shutdown_sender);
123
124        // FIXME: allow specify custom config
125        let opts = self.get_storage_opts();
126
127        tracing::info!("using StorageOpts: {:#?}", opts);
128
129        let metrics = Metrics {
130            hummock_metrics: Arc::new(HummockMetrics::unused()),
131            state_store_metrics: Arc::new(HummockStateStoreMetrics::unused()),
132            object_store_metrics: Arc::new(ObjectStoreMetrics::unused()),
133            storage_metrics: Arc::new(MonitoredStorageMetrics::unused()),
134            compactor_metrics: Arc::new(CompactorMetrics::unused()),
135        };
136
137        let state_store_impl = StateStoreImpl::new(
138            &self.hummock_url,
139            Arc::new(opts),
140            Arc::new(MonitoredHummockMetaClient::new(
141                meta_client.clone(),
142                metrics.hummock_metrics.clone(),
143            )),
144            metrics.state_store_metrics.clone(),
145            metrics.object_store_metrics.clone(),
146            metrics.storage_metrics.clone(),
147            metrics.compactor_metrics.clone(),
148            None,
149            self.use_new_object_prefix_strategy,
150        )
151        .await?;
152
153        if let Some(hummock_state_store) = state_store_impl.as_hummock() {
154            Ok((
155                hummock_state_store
156                    .clone()
157                    .monitored(metrics.storage_metrics.clone()),
158                metrics,
159            ))
160        } else {
161            Err(anyhow!("only Hummock state store is supported in risectl"))
162        }
163    }
164
165    pub async fn create_sstable_store(
166        &self,
167        use_new_object_prefix_strategy: bool,
168    ) -> Result<Arc<SstableStore>> {
169        let object_store = build_remote_object_store(
170            self.hummock_url.strip_prefix("hummock+").unwrap(),
171            Arc::new(ObjectStoreMetrics::unused()),
172            "Hummock",
173            Arc::new(ObjectStoreConfig::default()),
174        )
175        .await;
176
177        let opts = self.get_storage_opts();
178
179        let meta_cache = HybridCacheBuilder::new()
180            .memory(opts.meta_cache_capacity_mb * (1 << 20))
181            .with_shards(opts.meta_cache_shard_num)
182            .storage(Engine::Large)
183            .build()
184            .await?;
185        let block_cache = HybridCacheBuilder::new()
186            .memory(opts.block_cache_capacity_mb * (1 << 20))
187            .with_shards(opts.block_cache_shard_num)
188            .storage(Engine::Large)
189            .build()
190            .await?;
191
192        Ok(Arc::new(SstableStore::new(SstableStoreConfig {
193            store: Arc::new(object_store),
194            path: opts.data_directory,
195            prefetch_buffer_capacity: opts.block_cache_capacity_mb * (1 << 20),
196            max_prefetch_block_number: opts.max_prefetch_block_number,
197            recent_filter: None,
198            state_store_metrics: Arc::new(global_hummock_state_store_metrics(
199                MetricLevel::Disabled,
200            )),
201            use_new_object_prefix_strategy,
202            meta_cache,
203            block_cache,
204        })))
205    }
206}