1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
1415use std::time::Duration;
1617use futures::future::try_join_all;
18use itertools::Itertools;
19use risingwave_common::monitor::EndpointExt;
20use risingwave_pb::monitor_service::TieredCacheTracingRequest;
21use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
22use tonic::transport::Endpoint;
2324use crate::common::CtlContext;
2526pub async fn tiered_cache_tracing(
27 context: &CtlContext,
28 enable: bool,
29 record_hybrid_insert_threshold_ms: Option<u32>,
30 record_hybrid_get_threshold_ms: Option<u32>,
31 record_hybrid_obtain_threshold_ms: Option<u32>,
32 record_hybrid_remove_threshold_ms: Option<u32>,
33 record_hybrid_fetch_threshold_ms: Option<u32>,
34) -> anyhow::Result<()> {
35let meta_client = context.meta_client().await?;
36let info = meta_client.get_cluster_info().await?;
37let futures = info
38 .get_worker_nodes()
39 .iter()
40 .map(|worker_node| async {
41let addr = worker_node.get_host().unwrap();
42let channel = Endpoint::from_shared(format!("http://{}:{}", addr.host, addr.port))?
43.connect_timeout(Duration::from_secs(5))
44 .monitored_connect("grpc-tiered-cache-tracing-client", Default::default())
45 .await?;
46let mut client = MonitorServiceClient::new(channel);
47 client
48 .tiered_cache_tracing(TieredCacheTracingRequest {
49 enable,
50 record_hybrid_insert_threshold_ms,
51 record_hybrid_get_threshold_ms,
52 record_hybrid_obtain_threshold_ms,
53 record_hybrid_remove_threshold_ms,
54 record_hybrid_fetch_threshold_ms,
55 })
56 .await?;
57Ok::<_, anyhow::Error>(())
58 })
59 .collect_vec();
60 try_join_all(futures).await?;
61Ok(())
62}