Skip to main content

risingwave_ctl/cmd_impl/hummock/
tiered_cache_tracing.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use futures::future::try_join_all;
18use itertools::Itertools;
19use risingwave_common::monitor::EndpointExt;
20use risingwave_pb::configured_monitor_service_client;
21use risingwave_pb::monitor_service::TieredCacheTracingRequest;
22use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
23use tonic::transport::Endpoint;
24
25use crate::common::CtlContext;
26
27pub async fn tiered_cache_tracing(
28    context: &CtlContext,
29    enable: bool,
30    record_hybrid_insert_threshold_ms: Option<u32>,
31    record_hybrid_get_threshold_ms: Option<u32>,
32    record_hybrid_obtain_threshold_ms: Option<u32>,
33    record_hybrid_remove_threshold_ms: Option<u32>,
34    record_hybrid_fetch_threshold_ms: Option<u32>,
35) -> anyhow::Result<()> {
36    let meta_client = context.meta_client().await?;
37    let info = meta_client.get_cluster_info().await?;
38    let futures = info
39        .get_worker_nodes()
40        .iter()
41        .map(|worker_node| async {
42            let addr = worker_node.get_host().unwrap();
43            let channel = Endpoint::from_shared(format!("http://{}:{}", addr.host, addr.port))?
44                .connect_timeout(Duration::from_secs(5))
45                .monitored_connect("grpc-tiered-cache-tracing-client", Default::default())
46                .await?;
47            let mut client = configured_monitor_service_client(MonitorServiceClient::new(channel));
48            client
49                .tiered_cache_tracing(TieredCacheTracingRequest {
50                    enable,
51                    record_hybrid_insert_threshold_ms,
52                    record_hybrid_get_threshold_ms,
53                    record_hybrid_obtain_threshold_ms,
54                    record_hybrid_remove_threshold_ms,
55                    record_hybrid_fetch_threshold_ms,
56                })
57                .await?;
58            Ok::<_, anyhow::Error>(())
59        })
60        .collect_vec();
61    try_join_all(futures).await?;
62    Ok(())
63}