risingwave_rpc_client/
frontend_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, RpcClientConfig};
20use risingwave_common::monitor::{EndpointExt, TcpConfig};
21use risingwave_common::util::addr::HostAddr;
22use risingwave_pb::frontend_service::frontend_service_client::FrontendServiceClient;
23use risingwave_pb::frontend_service::{
24    CancelRunningSqlRequest, CancelRunningSqlResponse, GetRunningSqlsRequest,
25    GetRunningSqlsResponse, GetTableReplacePlanRequest, GetTableReplacePlanResponse,
26};
27use tokio_retry::strategy::{ExponentialBackoff, jitter};
28use tonic::Response;
29use tonic::transport::Endpoint;
30
31use crate::channel::{Channel, WrappedChannelExt};
32use crate::error::Result;
33use crate::{RpcClient, RpcClientPool};
34
35const DEFAULT_RETRY_INTERVAL: u64 = 50;
36const DEFAULT_RETRY_MAX_DELAY: Duration = Duration::from_secs(5);
37const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 10;
38
39#[derive(Clone)]
40struct FrontendClient(FrontendServiceClient<Channel>);
41
42impl FrontendClient {
43    async fn new(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
44        let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
45            .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
46            .connect_timeout(Duration::from_secs(opts.connect_timeout_secs))
47            .monitored_connect(
48                "grpc-frontend-client",
49                TcpConfig {
50                    tcp_nodelay: true,
51                    ..Default::default()
52                },
53            )
54            .await?
55            .wrapped();
56
57        Ok(Self(
58            FrontendServiceClient::new(channel).max_decoding_message_size(usize::MAX),
59        ))
60    }
61}
62
63// similar to the stream_client used in the Meta node
64pub type FrontendClientPool = RpcClientPool<FrontendRetryClient>;
65pub type FrontendClientPoolRef = Arc<FrontendClientPool>;
66
67#[async_trait]
68impl RpcClient for FrontendRetryClient {
69    async fn new_client(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
70        Self::new(host_addr, opts).await
71    }
72}
73
74#[derive(Clone)]
75pub struct FrontendRetryClient {
76    client: FrontendClient,
77}
78
79impl FrontendRetryClient {
80    async fn new(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
81        let client = FrontendClient::new(host_addr, opts).await?;
82        Ok(Self { client })
83    }
84
85    #[inline(always)]
86    fn get_retry_strategy() -> impl Iterator<Item = Duration> {
87        ExponentialBackoff::from_millis(DEFAULT_RETRY_INTERVAL)
88            .max_delay(DEFAULT_RETRY_MAX_DELAY)
89            .take(DEFAULT_RETRY_MAX_ATTEMPTS)
90            .map(jitter)
91    }
92
93    fn should_retry(status: &tonic::Status) -> bool {
94        if status.code() == tonic::Code::Unavailable
95            || status.code() == tonic::Code::Unknown
96            || status.code() == tonic::Code::Unauthenticated
97            || status.code() == tonic::Code::Aborted
98        {
99            return true;
100        }
101        false
102    }
103
104    pub async fn get_table_replace_plan(
105        &self,
106        request: GetTableReplacePlanRequest,
107    ) -> std::result::Result<Response<GetTableReplacePlanResponse>, tonic::Status> {
108        tokio_retry::RetryIf::spawn(
109            Self::get_retry_strategy(),
110            || async {
111                self.client
112                    .to_owned()
113                    .0
114                    .get_table_replace_plan(request.clone())
115                    .await
116            },
117            Self::should_retry,
118        )
119        .await
120    }
121
122    pub async fn get_running_sqls(
123        &self,
124        request: GetRunningSqlsRequest,
125    ) -> std::result::Result<Response<GetRunningSqlsResponse>, tonic::Status> {
126        self.client.0.to_owned().get_running_sqls(request).await
127    }
128
129    pub async fn cancel_running_sql(
130        &self,
131        request: CancelRunningSqlRequest,
132    ) -> std::result::Result<Response<CancelRunningSqlResponse>, tonic::Status> {
133        self.client.0.to_owned().cancel_running_sql(request).await
134    }
135}