risingwave_rpc_client/
compactor_client.rs1use std::sync::Arc;
16use std::time::Duration;
17
18use risingwave_common::monitor::EndpointExt;
19use risingwave_common::util::addr::HostAddr;
20use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
21use risingwave_pb::hummock::{
22 GetNewSstIdsRequest, GetNewSstIdsResponse, ReportCompactionTaskRequest,
23 ReportCompactionTaskResponse,
24};
25use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
26use risingwave_pb::meta::{GetSystemParamsRequest, GetSystemParamsResponse};
27use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
28use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
29use tokio::sync::RwLock;
30use tokio_retry::strategy::{ExponentialBackoff, jitter};
31use tonic::transport::{Channel, Endpoint};
32
33use crate::error::{Result, RpcError};
34use crate::retry_rpc;
35const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
36const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
37
38const DEFAULT_RETRY_INTERVAL: u64 = 20;
39const DEFAULT_RETRY_MAX_DELAY: Duration = Duration::from_secs(5);
40const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 3;
41#[derive(Clone)]
42pub struct CompactorClient {
43 pub monitor_client: MonitorServiceClient<Channel>,
44}
45
46impl CompactorClient {
47 pub async fn new(host_addr: HostAddr) -> Result<Self> {
48 let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
49 .connect_timeout(Duration::from_secs(5))
50 .monitored_connect("grpc-compactor-client", Default::default())
51 .await?;
52 Ok(Self {
53 monitor_client: MonitorServiceClient::new(channel),
54 })
55 }
56
57 pub async fn stack_trace(&self) -> Result<StackTraceResponse> {
58 Ok(self
59 .monitor_client
60 .to_owned()
61 .stack_trace(StackTraceRequest::default())
62 .await
63 .map_err(RpcError::from_compactor_status)?
64 .into_inner())
65 }
66}
67
68#[derive(Debug, Clone)]
69pub struct GrpcCompactorProxyClientCore {
70 hummock_client: HummockManagerServiceClient<Channel>,
71 system_params_client: SystemParamsServiceClient<Channel>,
72}
73
74impl GrpcCompactorProxyClientCore {
75 pub(crate) fn new(channel: Channel) -> Self {
76 let hummock_client =
77 HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
78 let system_params_client = SystemParamsServiceClient::new(channel);
79
80 Self {
81 hummock_client,
82 system_params_client,
83 }
84 }
85}
86
87#[derive(Debug, Clone)]
91pub struct GrpcCompactorProxyClient {
92 pub core: Arc<RwLock<GrpcCompactorProxyClientCore>>,
93 endpoint: String,
94}
95
96impl GrpcCompactorProxyClient {
97 pub async fn new(endpoint: String) -> Self {
98 let channel = Self::connect_to_endpoint(endpoint.clone()).await;
99 let core = Arc::new(RwLock::new(GrpcCompactorProxyClientCore::new(channel)));
100 Self { core, endpoint }
101 }
102
103 async fn recreate_core(&self) {
104 tracing::info!("GrpcCompactorProxyClient rpc transfer failed, try to reconnect");
105 let channel = Self::connect_to_endpoint(self.endpoint.clone()).await;
106 let mut core = self.core.write().await;
107 *core = GrpcCompactorProxyClientCore::new(channel);
108 }
109
110 async fn connect_to_endpoint(endpoint: String) -> Channel {
111 let endpoint = Endpoint::from_shared(endpoint).expect("Fail to construct tonic Endpoint");
112 endpoint
113 .http2_keep_alive_interval(Duration::from_secs(ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
114 .keep_alive_timeout(Duration::from_secs(ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
115 .connect_timeout(Duration::from_secs(5))
116 .monitored_connect("grpc-compactor-proxy-client", Default::default())
117 .await
118 .expect("Failed to create channel via proxy rpc endpoint.")
119 }
120
121 pub async fn get_new_sst_ids(
122 &self,
123 request: GetNewSstIdsRequest,
124 ) -> std::result::Result<tonic::Response<GetNewSstIdsResponse>, tonic::Status> {
125 retry_rpc!(self, get_new_sst_ids, request, GetNewSstIdsResponse)
126 }
127
128 pub async fn report_compaction_task(
129 &self,
130 request: ReportCompactionTaskRequest,
131 ) -> std::result::Result<tonic::Response<ReportCompactionTaskResponse>, tonic::Status> {
132 retry_rpc!(
133 self,
134 report_compaction_task,
135 request,
136 ReportCompactionTaskResponse
137 )
138 }
139
140 pub async fn get_system_params(
141 &self,
142 ) -> std::result::Result<tonic::Response<GetSystemParamsResponse>, tonic::Status> {
143 tokio_retry::RetryIf::spawn(
144 Self::get_retry_strategy(),
145 || async {
146 let mut system_params_client = self.core.read().await.system_params_client.clone();
147 let rpc_res = system_params_client
148 .get_system_params(GetSystemParamsRequest {})
149 .await;
150 if rpc_res.is_err() {
151 self.recreate_core().await;
152 }
153 rpc_res
154 },
155 Self::should_retry,
156 )
157 .await
158 }
159
160 #[inline(always)]
161 fn get_retry_strategy() -> impl Iterator<Item = Duration> {
162 ExponentialBackoff::from_millis(DEFAULT_RETRY_INTERVAL)
163 .max_delay(DEFAULT_RETRY_MAX_DELAY)
164 .take(DEFAULT_RETRY_MAX_ATTEMPTS)
165 .map(jitter)
166 }
167
168 #[inline(always)]
169 fn should_retry(status: &tonic::Status) -> bool {
170 if status.code() == tonic::Code::Unavailable
171 || status.code() == tonic::Code::Unknown
172 || (status.code() == tonic::Code::Unauthenticated
173 && status.message().contains("invalid auth token"))
174 {
175 return true;
176 }
177 false
178 }
179}
180
181#[macro_export]
182macro_rules! retry_rpc {
183 ($self:expr, $rpc_call:ident, $request:expr, $response:ty) => {
184 tokio_retry::RetryIf::spawn(
185 Self::get_retry_strategy(),
186 || async {
187 let mut hummock_client = $self.core.read().await.hummock_client.clone();
188 let rpc_res = hummock_client.$rpc_call($request.clone()).await;
189 if rpc_res.is_err() {
190 $self.recreate_core().await;
191 }
192 rpc_res
193 },
194 Self::should_retry,
195 )
196 .await
197 };
198}