risingwave_ctl/cmd_impl/hummock/
tiered_cache_tracing.rs1use std::time::Duration;
16
17use futures::future::try_join_all;
18use itertools::Itertools;
19use risingwave_common::monitor::EndpointExt;
20use risingwave_pb::configured_monitor_service_client;
21use risingwave_pb::monitor_service::TieredCacheTracingRequest;
22use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
23use tonic::transport::Endpoint;
24
25use crate::common::CtlContext;
26
27pub async fn tiered_cache_tracing(
28 context: &CtlContext,
29 enable: bool,
30 record_hybrid_insert_threshold_ms: Option<u32>,
31 record_hybrid_get_threshold_ms: Option<u32>,
32 record_hybrid_obtain_threshold_ms: Option<u32>,
33 record_hybrid_remove_threshold_ms: Option<u32>,
34 record_hybrid_fetch_threshold_ms: Option<u32>,
35) -> anyhow::Result<()> {
36 let meta_client = context.meta_client().await?;
37 let info = meta_client.get_cluster_info().await?;
38 let futures = info
39 .get_worker_nodes()
40 .iter()
41 .map(|worker_node| async {
42 let addr = worker_node.get_host().unwrap();
43 let channel = Endpoint::from_shared(format!("http://{}:{}", addr.host, addr.port))?
44 .connect_timeout(Duration::from_secs(5))
45 .monitored_connect("grpc-tiered-cache-tracing-client", Default::default())
46 .await?;
47 let mut client = configured_monitor_service_client(MonitorServiceClient::new(channel));
48 client
49 .tiered_cache_tracing(TieredCacheTracingRequest {
50 enable,
51 record_hybrid_insert_threshold_ms,
52 record_hybrid_get_threshold_ms,
53 record_hybrid_obtain_threshold_ms,
54 record_hybrid_remove_threshold_ms,
55 record_hybrid_fetch_threshold_ms,
56 })
57 .await?;
58 Ok::<_, anyhow::Error>(())
59 })
60 .collect_vec();
61 try_join_all(futures).await?;
62 Ok(())
63}