risingwave_ctl/common/
hummock_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::env;
16use std::sync::Arc;
17use std::time::Duration;
18
19use anyhow::{Result, anyhow, bail};
20use foyer::{CacheBuilder, HybridCacheBuilder};
21use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
22use risingwave_object_store::object::build_remote_object_store;
23use risingwave_rpc_client::MetaClient;
24use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
25use risingwave_storage::hummock::none::NoneRecentFilter;
26use risingwave_storage::hummock::{HummockStorage, SstableStore, SstableStoreConfig};
27use risingwave_storage::monitor::{
28    CompactorMetrics, HummockMetrics, HummockStateStoreMetrics, MonitoredStateStore,
29    MonitoredStorageMetrics, ObjectStoreMetrics, global_hummock_state_store_metrics,
30};
31use risingwave_storage::opts::StorageOpts;
32use risingwave_storage::{StateStore, StateStoreImpl};
33use tokio::sync::oneshot::Sender;
34use tokio::task::JoinHandle;
35
36pub struct HummockServiceOpts {
37    pub hummock_url: String,
38    pub data_dir: Option<String>,
39
40    use_new_object_prefix_strategy: bool,
41
42    heartbeat_handle: Option<JoinHandle<()>>,
43    heartbeat_shutdown_sender: Option<Sender<()>>,
44}
45
46#[derive(Clone)]
47pub struct Metrics {
48    pub hummock_metrics: Arc<HummockMetrics>,
49    pub state_store_metrics: Arc<HummockStateStoreMetrics>,
50    pub object_store_metrics: Arc<ObjectStoreMetrics>,
51    pub storage_metrics: Arc<MonitoredStorageMetrics>,
52    pub compactor_metrics: Arc<CompactorMetrics>,
53}
54
55impl HummockServiceOpts {
56    /// Recover hummock service options from env variable
57    ///
58    /// Currently, we will read these variables for meta:
59    ///
60    /// * `RW_HUMMOCK_URL`: hummock store address
61    pub fn from_env(
62        data_dir: Option<String>,
63        use_new_object_prefix_strategy: bool,
64    ) -> Result<Self> {
65        let hummock_url = match env::var("RW_HUMMOCK_URL") {
66            Ok(url) => {
67                if !url.starts_with("hummock+") {
68                    return Err(anyhow!(
69                        "only url starting with 'hummock+' is supported in risectl"
70                    ));
71                }
72                tracing::info!("using Hummock URL from `RW_HUMMOCK_URL`: {}", url);
73                url
74            }
75            Err(_) => {
76                const MESSAGE: &str = "env variable `RW_HUMMOCK_URL` not found.
77                    For `./risedev d` use cases, please do the following.
78                    * start the cluster with shared storage:
79                    - consider adding `use: minio` in the risedev config,
80                    - or directly use `./risedev d for-ctl` to start the cluster.
81                    * use `./risedev ctl` to use risectl.
82
83                    For production use cases,
84                    * please set `RW_HUMMOCK_URL` to the same address specified for the meta node.
85                ";
86                bail!(MESSAGE);
87            }
88        };
89
90        Ok(Self {
91            hummock_url,
92            data_dir,
93            heartbeat_handle: None,
94            heartbeat_shutdown_sender: None,
95            use_new_object_prefix_strategy,
96        })
97    }
98
99    fn get_storage_opts(&self) -> StorageOpts {
100        let mut opts = StorageOpts {
101            share_buffer_compaction_worker_threads_number: 0,
102            meta_cache_capacity_mb: 1,
103            block_cache_capacity_mb: 1,
104            meta_cache_shard_num: 1,
105            block_cache_shard_num: 1,
106            ..Default::default()
107        };
108
109        if let Some(dir) = &self.data_dir {
110            opts.data_directory.clone_from(dir);
111        }
112
113        opts
114    }
115
116    pub async fn create_hummock_store_with_metrics(
117        &mut self,
118        meta_client: &MetaClient,
119    ) -> Result<(MonitoredStateStore<HummockStorage>, Metrics)> {
120        let (heartbeat_handle, heartbeat_shutdown_sender) =
121            MetaClient::start_heartbeat_loop(meta_client.clone(), Duration::from_millis(1000));
122        self.heartbeat_handle = Some(heartbeat_handle);
123        self.heartbeat_shutdown_sender = Some(heartbeat_shutdown_sender);
124
125        // FIXME: allow specify custom config
126        let opts = self.get_storage_opts();
127
128        tracing::info!("using StorageOpts: {:#?}", opts);
129
130        let metrics = Metrics {
131            hummock_metrics: Arc::new(HummockMetrics::unused()),
132            state_store_metrics: Arc::new(HummockStateStoreMetrics::unused()),
133            object_store_metrics: Arc::new(ObjectStoreMetrics::unused()),
134            storage_metrics: Arc::new(MonitoredStorageMetrics::unused()),
135            compactor_metrics: Arc::new(CompactorMetrics::unused()),
136        };
137
138        let state_store_impl = StateStoreImpl::new(
139            &self.hummock_url,
140            Arc::new(opts),
141            Arc::new(MonitoredHummockMetaClient::new(
142                meta_client.clone(),
143                metrics.hummock_metrics.clone(),
144            )),
145            metrics.state_store_metrics.clone(),
146            metrics.object_store_metrics.clone(),
147            metrics.storage_metrics.clone(),
148            metrics.compactor_metrics.clone(),
149            None,
150            self.use_new_object_prefix_strategy,
151        )
152        .await?;
153
154        if let Some(hummock_state_store) = state_store_impl.as_hummock() {
155            Ok((
156                hummock_state_store
157                    .clone()
158                    .monitored(metrics.storage_metrics.clone()),
159                metrics,
160            ))
161        } else {
162            Err(anyhow!("only Hummock state store is supported in risectl"))
163        }
164    }
165
166    pub async fn create_sstable_store(
167        &self,
168        use_new_object_prefix_strategy: bool,
169    ) -> Result<Arc<SstableStore>> {
170        let object_store = build_remote_object_store(
171            self.hummock_url.strip_prefix("hummock+").unwrap(),
172            Arc::new(ObjectStoreMetrics::unused()),
173            "Hummock",
174            Arc::new(ObjectStoreConfig::default()),
175        )
176        .await;
177
178        let opts = self.get_storage_opts();
179
180        let meta_cache = HybridCacheBuilder::new()
181            .memory(opts.meta_cache_capacity_mb * (1 << 20))
182            .with_shards(opts.meta_cache_shard_num)
183            .storage()
184            .build()
185            .await?;
186        let block_cache = HybridCacheBuilder::new()
187            .memory(opts.block_cache_capacity_mb * (1 << 20))
188            .with_shards(opts.block_cache_shard_num)
189            .storage()
190            .build()
191            .await?;
192
193        Ok(Arc::new(SstableStore::new(SstableStoreConfig {
194            store: Arc::new(object_store),
195            path: opts.data_directory,
196            prefetch_buffer_capacity: opts.block_cache_capacity_mb * (1 << 20),
197            max_prefetch_block_number: opts.max_prefetch_block_number,
198            recent_filter: Arc::new(NoneRecentFilter::default().into()),
199            state_store_metrics: Arc::new(global_hummock_state_store_metrics(
200                MetricLevel::Disabled,
201            )),
202            use_new_object_prefix_strategy,
203            meta_cache,
204            block_cache,
205            vector_meta_cache: CacheBuilder::new(1 << 10).build(),
206            vector_block_cache: CacheBuilder::new(1 << 10).build(),
207        })))
208    }
209}