risingwave_rpc_client/
frontend_client.rs1use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, RpcClientConfig};
20use risingwave_common::monitor::{EndpointExt, TcpConfig};
21use risingwave_common::util::addr::HostAddr;
22use risingwave_pb::frontend_service::frontend_service_client::FrontendServiceClient;
23use risingwave_pb::frontend_service::{GetTableReplacePlanRequest, GetTableReplacePlanResponse};
24use tokio_retry::strategy::{ExponentialBackoff, jitter};
25use tonic::Response;
26use tonic::transport::Endpoint;
27
28use crate::channel::{Channel, WrappedChannelExt};
29use crate::error::Result;
30use crate::{RpcClient, RpcClientPool};
31
32const DEFAULT_RETRY_INTERVAL: u64 = 50;
33const DEFAULT_RETRY_MAX_DELAY: Duration = Duration::from_secs(5);
34const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 10;
35
36#[derive(Clone)]
37struct FrontendClient(FrontendServiceClient<Channel>);
38
39impl FrontendClient {
40 async fn new(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
41 let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
42 .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
43 .connect_timeout(Duration::from_secs(opts.connect_timeout_secs))
44 .monitored_connect(
45 "grpc-frontend-client",
46 TcpConfig {
47 tcp_nodelay: true,
48 ..Default::default()
49 },
50 )
51 .await?
52 .wrapped();
53
54 Ok(Self(
55 FrontendServiceClient::new(channel).max_decoding_message_size(usize::MAX),
56 ))
57 }
58}
59
60pub type FrontendClientPool = RpcClientPool<FrontendRetryClient>;
62pub type FrontendClientPoolRef = Arc<FrontendClientPool>;
63
64#[async_trait]
65impl RpcClient for FrontendRetryClient {
66 async fn new_client(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
67 Self::new(host_addr, opts).await
68 }
69}
70
71#[derive(Clone)]
72pub struct FrontendRetryClient {
73 client: FrontendClient,
74}
75
76impl FrontendRetryClient {
77 async fn new(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
78 let client = FrontendClient::new(host_addr, opts).await?;
79 Ok(Self { client })
80 }
81
82 #[inline(always)]
83 fn get_retry_strategy() -> impl Iterator<Item = Duration> {
84 ExponentialBackoff::from_millis(DEFAULT_RETRY_INTERVAL)
85 .max_delay(DEFAULT_RETRY_MAX_DELAY)
86 .take(DEFAULT_RETRY_MAX_ATTEMPTS)
87 .map(jitter)
88 }
89
90 fn should_retry(status: &tonic::Status) -> bool {
91 if status.code() == tonic::Code::Unavailable
92 || status.code() == tonic::Code::Unknown
93 || status.code() == tonic::Code::Unauthenticated
94 || status.code() == tonic::Code::Aborted
95 {
96 return true;
97 }
98 false
99 }
100
101 pub async fn get_table_replace_plan(
102 &self,
103 request: GetTableReplacePlanRequest,
104 ) -> std::result::Result<Response<GetTableReplacePlanResponse>, tonic::Status> {
105 tokio_retry::RetryIf::spawn(
106 Self::get_retry_strategy(),
107 || async {
108 self.client
109 .to_owned()
110 .0
111 .get_table_replace_plan(request.clone())
112 .await
113 },
114 Self::should_retry,
115 )
116 .await
117 }
118}