risingwave_ctl/cmd_impl/hummock/
tiered_cache_tracing.rs1use std::time::Duration;
16
17use futures::future::try_join_all;
18use itertools::Itertools;
19use risingwave_common::monitor::EndpointExt;
20use risingwave_pb::monitor_service::TieredCacheTracingRequest;
21use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
22use tonic::transport::Endpoint;
23
24use crate::common::CtlContext;
25
26pub async fn tiered_cache_tracing(
27 context: &CtlContext,
28 enable: bool,
29 record_hybrid_insert_threshold_ms: Option<u32>,
30 record_hybrid_get_threshold_ms: Option<u32>,
31 record_hybrid_obtain_threshold_ms: Option<u32>,
32 record_hybrid_remove_threshold_ms: Option<u32>,
33 record_hybrid_fetch_threshold_ms: Option<u32>,
34) -> anyhow::Result<()> {
35 let meta_client = context.meta_client().await?;
36 let info = meta_client.get_cluster_info().await?;
37 let futures = info
38 .get_worker_nodes()
39 .iter()
40 .map(|worker_node| async {
41 let addr = worker_node.get_host().unwrap();
42 let channel = Endpoint::from_shared(format!("http://{}:{}", addr.host, addr.port))?
43 .connect_timeout(Duration::from_secs(5))
44 .monitored_connect("grpc-tiered-cache-tracing-client", Default::default())
45 .await?;
46 let mut client = MonitorServiceClient::new(channel);
47 client
48 .tiered_cache_tracing(TieredCacheTracingRequest {
49 enable,
50 record_hybrid_insert_threshold_ms,
51 record_hybrid_get_threshold_ms,
52 record_hybrid_obtain_threshold_ms,
53 record_hybrid_remove_threshold_ms,
54 record_hybrid_fetch_threshold_ms,
55 })
56 .await?;
57 Ok::<_, anyhow::Error>(())
58 })
59 .collect_vec();
60 try_join_all(futures).await?;
61 Ok(())
62}