risingwave_rpc_client/
frontend_client.rs1use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, RpcClientConfig};
20use risingwave_common::monitor::{EndpointExt, TcpConfig};
21use risingwave_common::util::addr::HostAddr;
22use risingwave_pb::frontend_service::frontend_service_client::FrontendServiceClient;
23use risingwave_pb::frontend_service::{
24 CancelRunningSqlRequest, CancelRunningSqlResponse, GetRunningSqlsRequest,
25 GetRunningSqlsResponse, GetTableReplacePlanRequest, GetTableReplacePlanResponse,
26};
27use tokio_retry::strategy::{ExponentialBackoff, jitter};
28use tonic::Response;
29use tonic::transport::Endpoint;
30
31use crate::channel::{Channel, WrappedChannelExt};
32use crate::error::Result;
33use crate::{RpcClient, RpcClientPool};
34
35const DEFAULT_RETRY_INTERVAL: u64 = 50;
36const DEFAULT_RETRY_MAX_DELAY: Duration = Duration::from_secs(5);
37const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 10;
38
39#[derive(Clone)]
40struct FrontendClient(FrontendServiceClient<Channel>);
41
42impl FrontendClient {
43 async fn new(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
44 let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
45 .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
46 .connect_timeout(Duration::from_secs(opts.connect_timeout_secs))
47 .monitored_connect(
48 "grpc-frontend-client",
49 TcpConfig {
50 tcp_nodelay: true,
51 ..Default::default()
52 },
53 )
54 .await?
55 .wrapped();
56
57 Ok(Self(
58 FrontendServiceClient::new(channel).max_decoding_message_size(usize::MAX),
59 ))
60 }
61}
62
63pub type FrontendClientPool = RpcClientPool<FrontendRetryClient>;
65pub type FrontendClientPoolRef = Arc<FrontendClientPool>;
66
67#[async_trait]
68impl RpcClient for FrontendRetryClient {
69 async fn new_client(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
70 Self::new(host_addr, opts).await
71 }
72}
73
74#[derive(Clone)]
75pub struct FrontendRetryClient {
76 client: FrontendClient,
77}
78
79impl FrontendRetryClient {
80 async fn new(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
81 let client = FrontendClient::new(host_addr, opts).await?;
82 Ok(Self { client })
83 }
84
85 #[inline(always)]
86 fn get_retry_strategy() -> impl Iterator<Item = Duration> {
87 ExponentialBackoff::from_millis(DEFAULT_RETRY_INTERVAL)
88 .max_delay(DEFAULT_RETRY_MAX_DELAY)
89 .take(DEFAULT_RETRY_MAX_ATTEMPTS)
90 .map(jitter)
91 }
92
93 fn should_retry(status: &tonic::Status) -> bool {
94 if status.code() == tonic::Code::Unavailable
95 || status.code() == tonic::Code::Unknown
96 || status.code() == tonic::Code::Unauthenticated
97 || status.code() == tonic::Code::Aborted
98 {
99 return true;
100 }
101 false
102 }
103
104 pub async fn get_table_replace_plan(
105 &self,
106 request: GetTableReplacePlanRequest,
107 ) -> std::result::Result<Response<GetTableReplacePlanResponse>, tonic::Status> {
108 tokio_retry::RetryIf::spawn(
109 Self::get_retry_strategy(),
110 || async {
111 self.client
112 .to_owned()
113 .0
114 .get_table_replace_plan(request.clone())
115 .await
116 },
117 Self::should_retry,
118 )
119 .await
120 }
121
122 pub async fn get_running_sqls(
123 &self,
124 request: GetRunningSqlsRequest,
125 ) -> std::result::Result<Response<GetRunningSqlsResponse>, tonic::Status> {
126 self.client.0.to_owned().get_running_sqls(request).await
127 }
128
129 pub async fn cancel_running_sql(
130 &self,
131 request: CancelRunningSqlRequest,
132 ) -> std::result::Result<Response<CancelRunningSqlResponse>, tonic::Status> {
133 self.client.0.to_owned().cancel_running_sql(request).await
134 }
135}