risingwave_rpc_client/
frontend_client.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, RpcClientConfig};
20use risingwave_common::monitor::{EndpointExt, TcpConfig};
21use risingwave_common::util::addr::HostAddr;
22use risingwave_pb::frontend_service::frontend_service_client::FrontendServiceClient;
23use risingwave_pb::frontend_service::{
24    CancelRunningSqlRequest, CancelRunningSqlResponse, GetAllCursorsRequest, GetAllCursorsResponse,
25    GetAllSubCursorsRequest, GetAllSubCursorsResponse, GetRunningSqlsRequest,
26    GetRunningSqlsResponse, GetTableReplacePlanRequest, GetTableReplacePlanResponse,
27};
28use tokio_retry::strategy::{ExponentialBackoff, jitter};
29use tonic::Response;
30use tonic::transport::Endpoint;
31
32use crate::channel::{Channel, WrappedChannelExt};
33use crate::error::Result;
34use crate::{RpcClient, RpcClientPool};
35
36const DEFAULT_RETRY_INTERVAL: u64 = 50;
37const DEFAULT_RETRY_MAX_DELAY: Duration = Duration::from_secs(5);
38const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 10;
39
40#[derive(Clone)]
41struct FrontendClient(FrontendServiceClient<Channel>);
42
43impl FrontendClient {
44    async fn new(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
45        let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
46            .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
47            .connect_timeout(Duration::from_secs(opts.connect_timeout_secs))
48            .monitored_connect(
49                "grpc-frontend-client",
50                TcpConfig {
51                    tcp_nodelay: true,
52                    ..Default::default()
53                },
54            )
55            .await?
56            .wrapped();
57
58        Ok(Self(
59            FrontendServiceClient::new(channel).max_decoding_message_size(usize::MAX),
60        ))
61    }
62}
63
64// similar to the stream_client used in the Meta node
65pub type FrontendClientPool = RpcClientPool<FrontendRetryClient>;
66pub type FrontendClientPoolRef = Arc<FrontendClientPool>;
67
68#[async_trait]
69impl RpcClient for FrontendRetryClient {
70    async fn new_client(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
71        Self::new(host_addr, opts).await
72    }
73}
74
75#[derive(Clone)]
76pub struct FrontendRetryClient {
77    client: FrontendClient,
78}
79
80impl FrontendRetryClient {
81    async fn new(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
82        let client = FrontendClient::new(host_addr, opts).await?;
83        Ok(Self { client })
84    }
85
86    #[inline(always)]
87    fn get_retry_strategy() -> impl Iterator<Item = Duration> {
88        ExponentialBackoff::from_millis(DEFAULT_RETRY_INTERVAL)
89            .max_delay(DEFAULT_RETRY_MAX_DELAY)
90            .take(DEFAULT_RETRY_MAX_ATTEMPTS)
91            .map(jitter)
92    }
93
94    fn should_retry(status: &tonic::Status) -> bool {
95        if status.code() == tonic::Code::Unavailable
96            || status.code() == tonic::Code::Unknown
97            || status.code() == tonic::Code::Unauthenticated
98            || status.code() == tonic::Code::Aborted
99        {
100            return true;
101        }
102        false
103    }
104
105    pub async fn get_table_replace_plan(
106        &self,
107        request: GetTableReplacePlanRequest,
108    ) -> std::result::Result<Response<GetTableReplacePlanResponse>, tonic::Status> {
109        tokio_retry::RetryIf::spawn(
110            Self::get_retry_strategy(),
111            || async {
112                self.client
113                    .clone()
114                    .0
115                    .get_table_replace_plan(request.clone())
116                    .await
117            },
118            Self::should_retry,
119        )
120        .await
121    }
122
123    pub async fn get_running_sqls(
124        &self,
125        request: GetRunningSqlsRequest,
126    ) -> std::result::Result<Response<GetRunningSqlsResponse>, tonic::Status> {
127        self.client.0.clone().get_running_sqls(request).await
128    }
129
130    pub async fn get_all_cursors(
131        &self,
132        request: GetAllCursorsRequest,
133    ) -> std::result::Result<Response<GetAllCursorsResponse>, tonic::Status> {
134        self.client.0.clone().get_all_cursors(request).await
135    }
136
137    pub async fn get_all_sub_cursors(
138        &self,
139        request: GetAllSubCursorsRequest,
140    ) -> std::result::Result<Response<GetAllSubCursorsResponse>, tonic::Status> {
141        self.client.0.clone().get_all_sub_cursors(request).await
142    }
143
144    pub async fn cancel_running_sql(
145        &self,
146        request: CancelRunningSqlRequest,
147    ) -> std::result::Result<Response<CancelRunningSqlResponse>, tonic::Status> {
148        self.client.0.clone().cancel_running_sql(request).await
149    }
150}