risingwave_rpc_client/
compactor_client.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use risingwave_common::monitor::EndpointExt;
19use risingwave_common::util::addr::HostAddr;
20use risingwave_pb::configured_monitor_service_client;
21use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
22use risingwave_pb::hummock::{
23    GetNewObjectIdsRequest, GetNewObjectIdsResponse, ReportCompactionTaskRequest,
24    ReportCompactionTaskResponse,
25};
26use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
27use risingwave_pb::meta::{GetSystemParamsRequest, GetSystemParamsResponse};
28use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
29use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
30use tokio::sync::RwLock;
31use tokio_retry::strategy::{ExponentialBackoff, jitter};
32use tonic::transport::{Channel, Endpoint};
33
34use crate::error::{Result, RpcError};
35use crate::retry_rpc;
36const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
37const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
38
39const DEFAULT_RETRY_INTERVAL: u64 = 20;
40const DEFAULT_RETRY_MAX_DELAY: Duration = Duration::from_secs(5);
41const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 3;
42#[derive(Clone)]
43pub struct CompactorClient {
44    pub monitor_client: MonitorServiceClient<Channel>,
45}
46
47impl CompactorClient {
48    pub async fn new(host_addr: HostAddr) -> Result<Self> {
49        let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
50            .connect_timeout(Duration::from_secs(5))
51            .monitored_connect("grpc-compactor-client", Default::default())
52            .await?;
53        Ok(Self {
54            monitor_client: configured_monitor_service_client(MonitorServiceClient::new(channel)),
55        })
56    }
57
58    pub async fn stack_trace(&self, req: StackTraceRequest) -> Result<StackTraceResponse> {
59        Ok(self
60            .monitor_client
61            .clone()
62            .stack_trace(req)
63            .await
64            .map_err(RpcError::from_compactor_status)?
65            .into_inner())
66    }
67}
68
69#[derive(Debug, Clone)]
70pub struct GrpcCompactorProxyClientCore {
71    hummock_client: HummockManagerServiceClient<Channel>,
72    system_params_client: SystemParamsServiceClient<Channel>,
73}
74
75impl GrpcCompactorProxyClientCore {
76    pub(crate) fn new(channel: Channel) -> Self {
77        let hummock_client =
78            HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
79        let system_params_client = SystemParamsServiceClient::new(channel);
80
81        Self {
82            hummock_client,
83            system_params_client,
84        }
85    }
86}
87
88/// Client to proxy server. Cloning the instance is lightweight.
89///
90/// Todo(wcy-fdu): add refresh client interface.
91#[derive(Debug, Clone)]
92pub struct GrpcCompactorProxyClient {
93    pub core: Arc<RwLock<GrpcCompactorProxyClientCore>>,
94    endpoint: String,
95}
96
97impl GrpcCompactorProxyClient {
98    pub async fn new(endpoint: String) -> Self {
99        let channel = Self::connect_to_endpoint(endpoint.clone()).await;
100        let core = Arc::new(RwLock::new(GrpcCompactorProxyClientCore::new(channel)));
101        Self { core, endpoint }
102    }
103
104    async fn recreate_core(&self) {
105        tracing::info!("GrpcCompactorProxyClient rpc transfer failed, try to reconnect");
106        let channel = Self::connect_to_endpoint(self.endpoint.clone()).await;
107        let mut core = self.core.write().await;
108        *core = GrpcCompactorProxyClientCore::new(channel);
109    }
110
111    async fn connect_to_endpoint(endpoint: String) -> Channel {
112        let endpoint = Endpoint::from_shared(endpoint).expect("Fail to construct tonic Endpoint");
113        endpoint
114            .http2_keep_alive_interval(Duration::from_secs(ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
115            .keep_alive_timeout(Duration::from_secs(ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
116            .connect_timeout(Duration::from_secs(5))
117            .monitored_connect("grpc-compactor-proxy-client", Default::default())
118            .await
119            .expect("Failed to create channel via proxy rpc endpoint.")
120    }
121
122    pub async fn get_new_sst_ids(
123        &self,
124        request: GetNewObjectIdsRequest,
125    ) -> std::result::Result<tonic::Response<GetNewObjectIdsResponse>, tonic::Status> {
126        retry_rpc!(self, get_new_object_ids, request, GetNewObjectIdsResponse)
127    }
128
129    pub async fn report_compaction_task(
130        &self,
131        request: ReportCompactionTaskRequest,
132    ) -> std::result::Result<tonic::Response<ReportCompactionTaskResponse>, tonic::Status> {
133        retry_rpc!(
134            self,
135            report_compaction_task,
136            request,
137            ReportCompactionTaskResponse
138        )
139    }
140
141    pub async fn get_system_params(
142        &self,
143    ) -> std::result::Result<tonic::Response<GetSystemParamsResponse>, tonic::Status> {
144        tokio_retry::RetryIf::spawn(
145            Self::get_retry_strategy(),
146            || async {
147                let mut system_params_client = self.core.read().await.system_params_client.clone();
148                let rpc_res = system_params_client
149                    .get_system_params(GetSystemParamsRequest {})
150                    .await;
151                if rpc_res.is_err() {
152                    self.recreate_core().await;
153                }
154                rpc_res
155            },
156            Self::should_retry,
157        )
158        .await
159    }
160
161    #[inline(always)]
162    fn get_retry_strategy() -> impl Iterator<Item = Duration> {
163        ExponentialBackoff::from_millis(DEFAULT_RETRY_INTERVAL)
164            .max_delay(DEFAULT_RETRY_MAX_DELAY)
165            .take(DEFAULT_RETRY_MAX_ATTEMPTS)
166            .map(jitter)
167    }
168
169    #[inline(always)]
170    fn should_retry(status: &tonic::Status) -> bool {
171        if status.code() == tonic::Code::Unavailable
172            || status.code() == tonic::Code::Unknown
173            || (status.code() == tonic::Code::Unauthenticated
174                && status.message().contains("invalid auth token"))
175        {
176            return true;
177        }
178        false
179    }
180}
181
182#[macro_export]
183macro_rules! retry_rpc {
184    ($self:expr, $rpc_call:ident, $request:expr, $response:ty) => {
185        tokio_retry::RetryIf::spawn(
186            Self::get_retry_strategy(),
187            || async {
188                let mut hummock_client = $self.core.read().await.hummock_client.clone();
189                let rpc_res = hummock_client.$rpc_call($request.clone()).await;
190                if rpc_res.is_err() {
191                    $self.recreate_core().await;
192                }
193                rpc_res
194            },
195            Self::should_retry,
196        )
197        .await
198    };
199}