risingwave_ctl/cmd_impl/hummock/
resize_cache.rs1use std::process::exit;
16
17use futures::future::try_join_all;
18use risingwave_common::config::RpcClientConfig;
19use risingwave_pb::compute::ResizeCacheRequest;
20use risingwave_pb::meta::GetClusterInfoResponse;
21use risingwave_rpc_client::ComputeClient;
22use thiserror_ext::AsReport;
23
24use crate::common::CtlContext;
25
26macro_rules! fail {
27 ($($arg:tt)*) => {{
28 println!($($arg)*);
29 exit(1);
30 }};
31}
32
33pub async fn resize_cache(
34 context: &CtlContext,
35 meta_cache_capacity: Option<u64>,
36 data_cache_capacity: Option<u64>,
37) -> anyhow::Result<()> {
38 let meta_client = context.meta_client().await?;
39
40 let GetClusterInfoResponse { worker_nodes, .. } = match meta_client.get_cluster_info().await {
41 Ok(resp) => resp,
42 Err(e) => {
43 fail!("Failed to get cluster info: {}", e.as_report());
44 }
45 };
46
47 let futures = worker_nodes.iter().map(|worker| async {
48 let addr = worker.get_host().expect("worker host must be set");
49 let client = ComputeClient::new(addr.into(), &RpcClientConfig::default())
50 .await
51 .unwrap_or_else(|_| panic!("Cannot open client to compute node {addr:?}"));
52 client
53 .resize_cache(ResizeCacheRequest {
54 meta_cache_capacity: meta_cache_capacity.unwrap_or(0),
55 data_cache_capacity: data_cache_capacity.unwrap_or(0),
56 })
57 .await
58 });
59
60 if let Err(e) = try_join_all(futures).await {
61 fail!("Failed to resize cache: {}", e.as_report())
62 }
63
64 Ok(())
65}