risingwave_rpc_client/
compactor_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use risingwave_common::monitor::EndpointExt;
19use risingwave_common::util::addr::HostAddr;
20use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
21use risingwave_pb::hummock::{
22    GetNewSstIdsRequest, GetNewSstIdsResponse, ReportCompactionTaskRequest,
23    ReportCompactionTaskResponse,
24};
25use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
26use risingwave_pb::meta::{GetSystemParamsRequest, GetSystemParamsResponse};
27use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
28use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
29use tokio::sync::RwLock;
30use tokio_retry::strategy::{ExponentialBackoff, jitter};
31use tonic::transport::{Channel, Endpoint};
32
33use crate::error::{Result, RpcError};
34use crate::retry_rpc;
35const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
36const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
37
38const DEFAULT_RETRY_INTERVAL: u64 = 20;
39const DEFAULT_RETRY_MAX_DELAY: Duration = Duration::from_secs(5);
40const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 3;
41#[derive(Clone)]
42pub struct CompactorClient {
43    pub monitor_client: MonitorServiceClient<Channel>,
44}
45
46impl CompactorClient {
47    pub async fn new(host_addr: HostAddr) -> Result<Self> {
48        let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
49            .connect_timeout(Duration::from_secs(5))
50            .monitored_connect("grpc-compactor-client", Default::default())
51            .await?;
52        Ok(Self {
53            monitor_client: MonitorServiceClient::new(channel),
54        })
55    }
56
57    pub async fn stack_trace(&self) -> Result<StackTraceResponse> {
58        Ok(self
59            .monitor_client
60            .to_owned()
61            .stack_trace(StackTraceRequest::default())
62            .await
63            .map_err(RpcError::from_compactor_status)?
64            .into_inner())
65    }
66}
67
68#[derive(Debug, Clone)]
69pub struct GrpcCompactorProxyClientCore {
70    hummock_client: HummockManagerServiceClient<Channel>,
71    system_params_client: SystemParamsServiceClient<Channel>,
72}
73
74impl GrpcCompactorProxyClientCore {
75    pub(crate) fn new(channel: Channel) -> Self {
76        let hummock_client =
77            HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
78        let system_params_client = SystemParamsServiceClient::new(channel);
79
80        Self {
81            hummock_client,
82            system_params_client,
83        }
84    }
85}
86
87/// Client to proxy server. Cloning the instance is lightweight.
88///
89/// Todo(wcy-fdu): add refresh client interface.
90#[derive(Debug, Clone)]
91pub struct GrpcCompactorProxyClient {
92    pub core: Arc<RwLock<GrpcCompactorProxyClientCore>>,
93    endpoint: String,
94}
95
96impl GrpcCompactorProxyClient {
97    pub async fn new(endpoint: String) -> Self {
98        let channel = Self::connect_to_endpoint(endpoint.clone()).await;
99        let core = Arc::new(RwLock::new(GrpcCompactorProxyClientCore::new(channel)));
100        Self { core, endpoint }
101    }
102
103    async fn recreate_core(&self) {
104        tracing::info!("GrpcCompactorProxyClient rpc transfer failed, try to reconnect");
105        let channel = Self::connect_to_endpoint(self.endpoint.clone()).await;
106        let mut core = self.core.write().await;
107        *core = GrpcCompactorProxyClientCore::new(channel);
108    }
109
110    async fn connect_to_endpoint(endpoint: String) -> Channel {
111        let endpoint = Endpoint::from_shared(endpoint).expect("Fail to construct tonic Endpoint");
112        endpoint
113            .http2_keep_alive_interval(Duration::from_secs(ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
114            .keep_alive_timeout(Duration::from_secs(ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
115            .connect_timeout(Duration::from_secs(5))
116            .monitored_connect("grpc-compactor-proxy-client", Default::default())
117            .await
118            .expect("Failed to create channel via proxy rpc endpoint.")
119    }
120
121    pub async fn get_new_sst_ids(
122        &self,
123        request: GetNewSstIdsRequest,
124    ) -> std::result::Result<tonic::Response<GetNewSstIdsResponse>, tonic::Status> {
125        retry_rpc!(self, get_new_sst_ids, request, GetNewSstIdsResponse)
126    }
127
128    pub async fn report_compaction_task(
129        &self,
130        request: ReportCompactionTaskRequest,
131    ) -> std::result::Result<tonic::Response<ReportCompactionTaskResponse>, tonic::Status> {
132        retry_rpc!(
133            self,
134            report_compaction_task,
135            request,
136            ReportCompactionTaskResponse
137        )
138    }
139
140    pub async fn get_system_params(
141        &self,
142    ) -> std::result::Result<tonic::Response<GetSystemParamsResponse>, tonic::Status> {
143        tokio_retry::RetryIf::spawn(
144            Self::get_retry_strategy(),
145            || async {
146                let mut system_params_client = self.core.read().await.system_params_client.clone();
147                let rpc_res = system_params_client
148                    .get_system_params(GetSystemParamsRequest {})
149                    .await;
150                if rpc_res.is_err() {
151                    self.recreate_core().await;
152                }
153                rpc_res
154            },
155            Self::should_retry,
156        )
157        .await
158    }
159
160    #[inline(always)]
161    fn get_retry_strategy() -> impl Iterator<Item = Duration> {
162        ExponentialBackoff::from_millis(DEFAULT_RETRY_INTERVAL)
163            .max_delay(DEFAULT_RETRY_MAX_DELAY)
164            .take(DEFAULT_RETRY_MAX_ATTEMPTS)
165            .map(jitter)
166    }
167
168    #[inline(always)]
169    fn should_retry(status: &tonic::Status) -> bool {
170        if status.code() == tonic::Code::Unavailable
171            || status.code() == tonic::Code::Unknown
172            || (status.code() == tonic::Code::Unauthenticated
173                && status.message().contains("invalid auth token"))
174        {
175            return true;
176        }
177        false
178    }
179}
180
181#[macro_export]
182macro_rules! retry_rpc {
183    ($self:expr, $rpc_call:ident, $request:expr, $response:ty) => {
184        tokio_retry::RetryIf::spawn(
185            Self::get_retry_strategy(),
186            || async {
187                let mut hummock_client = $self.core.read().await.hummock_client.clone();
188                let rpc_res = hummock_client.$rpc_call($request.clone()).await;
189                if rpc_res.is_err() {
190                    $self.recreate_core().await;
191                }
192                rpc_res
193            },
194            Self::should_retry,
195        )
196        .await
197    };
198}