risingwave_rpc_client/
frontend_client.rs1use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, RpcClientConfig};
20use risingwave_common::monitor::{EndpointExt, TcpConfig};
21use risingwave_common::util::addr::HostAddr;
22use risingwave_pb::frontend_service::frontend_service_client::FrontendServiceClient;
23use risingwave_pb::frontend_service::{
24 CancelRunningSqlRequest, CancelRunningSqlResponse, GetAllCursorsRequest, GetAllCursorsResponse,
25 GetAllSubCursorsRequest, GetAllSubCursorsResponse, GetRunningSqlsRequest,
26 GetRunningSqlsResponse, GetTableReplacePlanRequest, GetTableReplacePlanResponse,
27};
28use tokio_retry::strategy::{ExponentialBackoff, jitter};
29use tonic::Response;
30use tonic::transport::Endpoint;
31
32use crate::channel::{Channel, WrappedChannelExt};
33use crate::error::Result;
34use crate::{RpcClient, RpcClientPool};
35
36const DEFAULT_RETRY_INTERVAL: u64 = 50;
37const DEFAULT_RETRY_MAX_DELAY: Duration = Duration::from_secs(5);
38const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 10;
39
40#[derive(Clone)]
41struct FrontendClient(FrontendServiceClient<Channel>);
42
43impl FrontendClient {
44 async fn new(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
45 let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
46 .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
47 .connect_timeout(Duration::from_secs(opts.connect_timeout_secs))
48 .monitored_connect(
49 "grpc-frontend-client",
50 TcpConfig {
51 tcp_nodelay: true,
52 ..Default::default()
53 },
54 )
55 .await?
56 .wrapped();
57
58 Ok(Self(
59 FrontendServiceClient::new(channel).max_decoding_message_size(usize::MAX),
60 ))
61 }
62}
63
64pub type FrontendClientPool = RpcClientPool<FrontendRetryClient>;
66pub type FrontendClientPoolRef = Arc<FrontendClientPool>;
67
68#[async_trait]
69impl RpcClient for FrontendRetryClient {
70 async fn new_client(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
71 Self::new(host_addr, opts).await
72 }
73}
74
75#[derive(Clone)]
76pub struct FrontendRetryClient {
77 client: FrontendClient,
78}
79
80impl FrontendRetryClient {
81 async fn new(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
82 let client = FrontendClient::new(host_addr, opts).await?;
83 Ok(Self { client })
84 }
85
86 #[inline(always)]
87 fn get_retry_strategy() -> impl Iterator<Item = Duration> {
88 ExponentialBackoff::from_millis(DEFAULT_RETRY_INTERVAL)
89 .max_delay(DEFAULT_RETRY_MAX_DELAY)
90 .take(DEFAULT_RETRY_MAX_ATTEMPTS)
91 .map(jitter)
92 }
93
94 fn should_retry(status: &tonic::Status) -> bool {
95 if status.code() == tonic::Code::Unavailable
96 || status.code() == tonic::Code::Unknown
97 || status.code() == tonic::Code::Unauthenticated
98 || status.code() == tonic::Code::Aborted
99 {
100 return true;
101 }
102 false
103 }
104
105 pub async fn get_table_replace_plan(
106 &self,
107 request: GetTableReplacePlanRequest,
108 ) -> std::result::Result<Response<GetTableReplacePlanResponse>, tonic::Status> {
109 tokio_retry::RetryIf::spawn(
110 Self::get_retry_strategy(),
111 || async {
112 self.client
113 .clone()
114 .0
115 .get_table_replace_plan(request.clone())
116 .await
117 },
118 Self::should_retry,
119 )
120 .await
121 }
122
123 pub async fn get_running_sqls(
124 &self,
125 request: GetRunningSqlsRequest,
126 ) -> std::result::Result<Response<GetRunningSqlsResponse>, tonic::Status> {
127 self.client.0.clone().get_running_sqls(request).await
128 }
129
130 pub async fn get_all_cursors(
131 &self,
132 request: GetAllCursorsRequest,
133 ) -> std::result::Result<Response<GetAllCursorsResponse>, tonic::Status> {
134 self.client.0.clone().get_all_cursors(request).await
135 }
136
137 pub async fn get_all_sub_cursors(
138 &self,
139 request: GetAllSubCursorsRequest,
140 ) -> std::result::Result<Response<GetAllSubCursorsResponse>, tonic::Status> {
141 self.client.0.clone().get_all_sub_cursors(request).await
142 }
143
144 pub async fn cancel_running_sql(
145 &self,
146 request: CancelRunningSqlRequest,
147 ) -> std::result::Result<Response<CancelRunningSqlResponse>, tonic::Status> {
148 self.client.0.clone().cancel_running_sql(request).await
149 }
150}