risingwave_rpc_client/
frontend_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, RpcClientConfig};
20use risingwave_common::monitor::{EndpointExt, TcpConfig};
21use risingwave_common::util::addr::HostAddr;
22use risingwave_pb::frontend_service::frontend_service_client::FrontendServiceClient;
23use risingwave_pb::frontend_service::{GetTableReplacePlanRequest, GetTableReplacePlanResponse};
24use tokio_retry::strategy::{ExponentialBackoff, jitter};
25use tonic::Response;
26use tonic::transport::Endpoint;
27
28use crate::channel::{Channel, WrappedChannelExt};
29use crate::error::Result;
30use crate::{RpcClient, RpcClientPool};
31
32const DEFAULT_RETRY_INTERVAL: u64 = 50;
33const DEFAULT_RETRY_MAX_DELAY: Duration = Duration::from_secs(5);
34const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 10;
35
36#[derive(Clone)]
37struct FrontendClient(FrontendServiceClient<Channel>);
38
39impl FrontendClient {
40    async fn new(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
41        let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
42            .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
43            .connect_timeout(Duration::from_secs(opts.connect_timeout_secs))
44            .monitored_connect(
45                "grpc-frontend-client",
46                TcpConfig {
47                    tcp_nodelay: true,
48                    ..Default::default()
49                },
50            )
51            .await?
52            .wrapped();
53
54        Ok(Self(
55            FrontendServiceClient::new(channel).max_decoding_message_size(usize::MAX),
56        ))
57    }
58}
59
60// similar to the stream_client used in the Meta node
61pub type FrontendClientPool = RpcClientPool<FrontendRetryClient>;
62pub type FrontendClientPoolRef = Arc<FrontendClientPool>;
63
64#[async_trait]
65impl RpcClient for FrontendRetryClient {
66    async fn new_client(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
67        Self::new(host_addr, opts).await
68    }
69}
70
71#[derive(Clone)]
72pub struct FrontendRetryClient {
73    client: FrontendClient,
74}
75
76impl FrontendRetryClient {
77    async fn new(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
78        let client = FrontendClient::new(host_addr, opts).await?;
79        Ok(Self { client })
80    }
81
82    #[inline(always)]
83    fn get_retry_strategy() -> impl Iterator<Item = Duration> {
84        ExponentialBackoff::from_millis(DEFAULT_RETRY_INTERVAL)
85            .max_delay(DEFAULT_RETRY_MAX_DELAY)
86            .take(DEFAULT_RETRY_MAX_ATTEMPTS)
87            .map(jitter)
88    }
89
90    fn should_retry(status: &tonic::Status) -> bool {
91        if status.code() == tonic::Code::Unavailable
92            || status.code() == tonic::Code::Unknown
93            || status.code() == tonic::Code::Unauthenticated
94            || status.code() == tonic::Code::Aborted
95        {
96            return true;
97        }
98        false
99    }
100
101    pub async fn get_table_replace_plan(
102        &self,
103        request: GetTableReplacePlanRequest,
104    ) -> std::result::Result<Response<GetTableReplacePlanResponse>, tonic::Status> {
105        tokio_retry::RetryIf::spawn(
106            Self::get_retry_strategy(),
107            || async {
108                self.client
109                    .to_owned()
110                    .0
111                    .get_table_replace_plan(request.clone())
112                    .await
113            },
114            Self::should_retry,
115        )
116        .await
117    }
118}