risingwave_ctl/cmd_impl/hummock/
tiered_cache_tracing.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use futures::future::try_join_all;
18use itertools::Itertools;
19use risingwave_common::monitor::EndpointExt;
20use risingwave_pb::monitor_service::TieredCacheTracingRequest;
21use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
22use tonic::transport::Endpoint;
23
24use crate::common::CtlContext;
25
26pub async fn tiered_cache_tracing(
27    context: &CtlContext,
28    enable: bool,
29    record_hybrid_insert_threshold_ms: Option<u32>,
30    record_hybrid_get_threshold_ms: Option<u32>,
31    record_hybrid_obtain_threshold_ms: Option<u32>,
32    record_hybrid_remove_threshold_ms: Option<u32>,
33    record_hybrid_fetch_threshold_ms: Option<u32>,
34) -> anyhow::Result<()> {
35    let meta_client = context.meta_client().await?;
36    let info = meta_client.get_cluster_info().await?;
37    let futures = info
38        .get_worker_nodes()
39        .iter()
40        .map(|worker_node| async {
41            let addr = worker_node.get_host().unwrap();
42            let channel = Endpoint::from_shared(format!("http://{}:{}", addr.host, addr.port))?
43                .connect_timeout(Duration::from_secs(5))
44                .monitored_connect("grpc-tiered-cache-tracing-client", Default::default())
45                .await?;
46            let mut client = MonitorServiceClient::new(channel);
47            client
48                .tiered_cache_tracing(TieredCacheTracingRequest {
49                    enable,
50                    record_hybrid_insert_threshold_ms,
51                    record_hybrid_get_threshold_ms,
52                    record_hybrid_obtain_threshold_ms,
53                    record_hybrid_remove_threshold_ms,
54                    record_hybrid_fetch_threshold_ms,
55                })
56                .await?;
57            Ok::<_, anyhow::Error>(())
58        })
59        .collect_vec();
60    try_join_all(futures).await?;
61    Ok(())
62}