risingwave_rpc_client/
compactor_client.rs1use std::sync::Arc;
16use std::time::Duration;
17
18use risingwave_common::monitor::EndpointExt;
19use risingwave_common::util::addr::HostAddr;
20use risingwave_pb::configured_monitor_service_client;
21use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient;
22use risingwave_pb::hummock::{
23 GetNewObjectIdsRequest, GetNewObjectIdsResponse, ReportCompactionTaskRequest,
24 ReportCompactionTaskResponse,
25};
26use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
27use risingwave_pb::meta::{GetSystemParamsRequest, GetSystemParamsResponse};
28use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
29use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
30use tokio::sync::RwLock;
31use tokio_retry::strategy::{ExponentialBackoff, jitter};
32use tonic::transport::{Channel, Endpoint};
33
34use crate::error::{Result, RpcError};
35use crate::retry_rpc;
36const ENDPOINT_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
37const ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC: u64 = 60;
38
39const DEFAULT_RETRY_INTERVAL: u64 = 20;
40const DEFAULT_RETRY_MAX_DELAY: Duration = Duration::from_secs(5);
41const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 3;
42#[derive(Clone)]
43pub struct CompactorClient {
44 pub monitor_client: MonitorServiceClient<Channel>,
45}
46
47impl CompactorClient {
48 pub async fn new(host_addr: HostAddr) -> Result<Self> {
49 let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
50 .connect_timeout(Duration::from_secs(5))
51 .monitored_connect("grpc-compactor-client", Default::default())
52 .await?;
53 Ok(Self {
54 monitor_client: configured_monitor_service_client(MonitorServiceClient::new(channel)),
55 })
56 }
57
58 pub async fn stack_trace(&self, req: StackTraceRequest) -> Result<StackTraceResponse> {
59 Ok(self
60 .monitor_client
61 .clone()
62 .stack_trace(req)
63 .await
64 .map_err(RpcError::from_compactor_status)?
65 .into_inner())
66 }
67}
68
69#[derive(Debug, Clone)]
70pub struct GrpcCompactorProxyClientCore {
71 hummock_client: HummockManagerServiceClient<Channel>,
72 system_params_client: SystemParamsServiceClient<Channel>,
73}
74
75impl GrpcCompactorProxyClientCore {
76 pub(crate) fn new(channel: Channel) -> Self {
77 let hummock_client =
78 HummockManagerServiceClient::new(channel.clone()).max_decoding_message_size(usize::MAX);
79 let system_params_client = SystemParamsServiceClient::new(channel);
80
81 Self {
82 hummock_client,
83 system_params_client,
84 }
85 }
86}
87
88#[derive(Debug, Clone)]
92pub struct GrpcCompactorProxyClient {
93 pub core: Arc<RwLock<GrpcCompactorProxyClientCore>>,
94 endpoint: String,
95}
96
97impl GrpcCompactorProxyClient {
98 pub async fn new(endpoint: String) -> Self {
99 let channel = Self::connect_to_endpoint(endpoint.clone()).await;
100 let core = Arc::new(RwLock::new(GrpcCompactorProxyClientCore::new(channel)));
101 Self { core, endpoint }
102 }
103
104 async fn recreate_core(&self) {
105 tracing::info!("GrpcCompactorProxyClient rpc transfer failed, try to reconnect");
106 let channel = Self::connect_to_endpoint(self.endpoint.clone()).await;
107 let mut core = self.core.write().await;
108 *core = GrpcCompactorProxyClientCore::new(channel);
109 }
110
111 async fn connect_to_endpoint(endpoint: String) -> Channel {
112 let endpoint = Endpoint::from_shared(endpoint).expect("Fail to construct tonic Endpoint");
113 endpoint
114 .http2_keep_alive_interval(Duration::from_secs(ENDPOINT_KEEP_ALIVE_INTERVAL_SEC))
115 .keep_alive_timeout(Duration::from_secs(ENDPOINT_KEEP_ALIVE_TIMEOUT_SEC))
116 .connect_timeout(Duration::from_secs(5))
117 .monitored_connect("grpc-compactor-proxy-client", Default::default())
118 .await
119 .expect("Failed to create channel via proxy rpc endpoint.")
120 }
121
122 pub async fn get_new_sst_ids(
123 &self,
124 request: GetNewObjectIdsRequest,
125 ) -> std::result::Result<tonic::Response<GetNewObjectIdsResponse>, tonic::Status> {
126 retry_rpc!(self, get_new_object_ids, request, GetNewObjectIdsResponse)
127 }
128
129 pub async fn report_compaction_task(
130 &self,
131 request: ReportCompactionTaskRequest,
132 ) -> std::result::Result<tonic::Response<ReportCompactionTaskResponse>, tonic::Status> {
133 retry_rpc!(
134 self,
135 report_compaction_task,
136 request,
137 ReportCompactionTaskResponse
138 )
139 }
140
141 pub async fn get_system_params(
142 &self,
143 ) -> std::result::Result<tonic::Response<GetSystemParamsResponse>, tonic::Status> {
144 tokio_retry::RetryIf::spawn(
145 Self::get_retry_strategy(),
146 || async {
147 let mut system_params_client = self.core.read().await.system_params_client.clone();
148 let rpc_res = system_params_client
149 .get_system_params(GetSystemParamsRequest {})
150 .await;
151 if rpc_res.is_err() {
152 self.recreate_core().await;
153 }
154 rpc_res
155 },
156 Self::should_retry,
157 )
158 .await
159 }
160
161 #[inline(always)]
162 fn get_retry_strategy() -> impl Iterator<Item = Duration> {
163 ExponentialBackoff::from_millis(DEFAULT_RETRY_INTERVAL)
164 .max_delay(DEFAULT_RETRY_MAX_DELAY)
165 .take(DEFAULT_RETRY_MAX_ATTEMPTS)
166 .map(jitter)
167 }
168
169 #[inline(always)]
170 fn should_retry(status: &tonic::Status) -> bool {
171 if status.code() == tonic::Code::Unavailable
172 || status.code() == tonic::Code::Unknown
173 || (status.code() == tonic::Code::Unauthenticated
174 && status.message().contains("invalid auth token"))
175 {
176 return true;
177 }
178 false
179 }
180}
181
182#[macro_export]
183macro_rules! retry_rpc {
184 ($self:expr, $rpc_call:ident, $request:expr, $response:ty) => {
185 tokio_retry::RetryIf::spawn(
186 Self::get_retry_strategy(),
187 || async {
188 let mut hummock_client = $self.core.read().await.hummock_client.clone();
189 let rpc_res = hummock_client.$rpc_call($request.clone()).await;
190 if rpc_res.is_err() {
191 $self.recreate_core().await;
192 }
193 rpc_res
194 },
195 Self::should_retry,
196 )
197 .await
198 };
199}