risingwave_ctl/cmd_impl/hummock/
resize_cache.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::process::exit;
16
17use futures::future::try_join_all;
18use risingwave_common::config::RpcClientConfig;
19use risingwave_pb::compute::ResizeCacheRequest;
20use risingwave_pb::meta::GetClusterInfoResponse;
21use risingwave_rpc_client::ComputeClient;
22use thiserror_ext::AsReport;
23
24use crate::common::CtlContext;
25
26macro_rules! fail {
27    ($($arg:tt)*) => {{
28        println!($($arg)*);
29        exit(1);
30    }};
31}
32
33pub async fn resize_cache(
34    context: &CtlContext,
35    meta_cache_capacity: Option<u64>,
36    data_cache_capacity: Option<u64>,
37) -> anyhow::Result<()> {
38    let meta_client = context.meta_client().await?;
39
40    let GetClusterInfoResponse { worker_nodes, .. } = match meta_client.get_cluster_info().await {
41        Ok(resp) => resp,
42        Err(e) => {
43            fail!("Failed to get cluster info: {}", e.as_report());
44        }
45    };
46
47    let futures = worker_nodes.iter().map(|worker| async {
48        let addr = worker.get_host().expect("worker host must be set");
49        let client = ComputeClient::new(addr.into(), &RpcClientConfig::default())
50            .await
51            .unwrap_or_else(|_| panic!("Cannot open client to compute node {addr:?}"));
52        client
53            .resize_cache(ResizeCacheRequest {
54                meta_cache_capacity: meta_cache_capacity.unwrap_or(0),
55                data_cache_capacity: data_cache_capacity.unwrap_or(0),
56            })
57            .await
58    });
59
60    if let Err(e) = try_join_all(futures).await {
61        fail!("Failed to resize cache: {}", e.as_report())
62    }
63
64    Ok(())
65}