risingwave_rpc_client/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Wrapper gRPC clients, which help constructing the request and destructing the
16//! response gRPC message structs.
17
18#![feature(trait_alias)]
19#![feature(type_alias_impl_trait)]
20#![feature(associated_type_defaults)]
21#![feature(coroutines)]
22#![feature(iterator_try_collect)]
23#![feature(try_blocks)]
24#![feature(let_chains)]
25#![feature(impl_trait_in_assoc_type)]
26#![feature(error_generic_member_access)]
27#![feature(panic_update_hook)]
28#![feature(negative_impls)]
29
30use std::any::type_name;
31use std::fmt::{Debug, Formatter};
32use std::future::Future;
33use std::str::FromStr;
34use std::sync::Arc;
35
36use anyhow::{Context, anyhow};
37use async_trait::async_trait;
38pub use compactor_client::{CompactorClient, GrpcCompactorProxyClient};
39pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef};
40pub use connector_client::{SinkCoordinatorStreamHandle, SinkWriterStreamHandle};
41use error::Result;
42pub use frontend_client::{FrontendClientPool, FrontendClientPoolRef};
43use futures::future::try_join_all;
44use futures::stream::{BoxStream, Peekable};
45use futures::{Stream, StreamExt};
46pub use hummock_meta_client::{
47    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
48    IcebergCompactionEventItem,
49};
50pub use meta_client::{MetaClient, SinkCoordinationRpcClient};
51use moka::future::Cache;
52use rand::prelude::IndexedRandom;
53use risingwave_common::config::RpcClientConfig;
54use risingwave_common::util::addr::HostAddr;
55use risingwave_pb::common::{WorkerNode, WorkerType};
56use rw_futures_util::await_future_with_monitor_error_stream;
57pub use sink_coordinate_client::CoordinatorStreamHandle;
58pub use stream_client::{
59    StreamClient, StreamClientPool, StreamClientPoolRef, StreamingControlHandle,
60};
61use tokio::sync::mpsc::{
62    Receiver, Sender, UnboundedReceiver, UnboundedSender, channel, unbounded_channel,
63};
64
65pub mod error;
66
67mod channel;
68mod compactor_client;
69mod compute_client;
70mod connector_client;
71mod frontend_client;
72mod hummock_meta_client;
73mod meta_client;
74mod sink_coordinate_client;
75mod stream_client;
76
77#[async_trait]
78pub trait RpcClient: Send + Sync + 'static + Clone {
79    async fn new_client(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self>;
80
81    async fn new_clients(
82        host_addr: HostAddr,
83        size: usize,
84        opts: &RpcClientConfig,
85    ) -> Result<Arc<Vec<Self>>> {
86        try_join_all(
87            std::iter::repeat_n(host_addr, size).map(|host_addr| Self::new_client(host_addr, opts)),
88        )
89        .await
90        .map(Arc::new)
91    }
92}
93
94#[derive(Clone)]
95pub struct RpcClientPool<S> {
96    connection_pool_size: u16,
97
98    clients: Cache<HostAddr, Arc<Vec<S>>>,
99
100    opts: RpcClientConfig,
101}
102
103impl<S> std::fmt::Debug for RpcClientPool<S> {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        f.debug_struct("RpcClientPool")
106            .field("connection_pool_size", &self.connection_pool_size)
107            .field("type", &type_name::<S>())
108            .field("len", &self.clients.entry_count())
109            .finish()
110    }
111}
112
113/// Intentionally not implementing `Default` to let callers be explicit about the pool size.
114impl<S> !Default for RpcClientPool<S> {}
115
116impl<S> RpcClientPool<S>
117where
118    S: RpcClient,
119{
120    /// Create a new pool with the given `connection_pool_size`, which is the number of
121    /// connections to each node that will be reused.
122    pub fn new(connection_pool_size: u16, opts: RpcClientConfig) -> Self {
123        Self {
124            connection_pool_size,
125            clients: Cache::new(u64::MAX),
126            opts,
127        }
128    }
129
130    /// Create a pool for testing purposes. Same as [`Self::adhoc`].
131    pub fn for_test() -> Self {
132        Self::adhoc()
133    }
134
135    /// Create a pool for ad-hoc usage, where the number of connections to each node is 1.
136    pub fn adhoc() -> Self {
137        Self::new(1, RpcClientConfig::default())
138    }
139
140    /// Gets the RPC client for the given node. If the connection is not established, a
141    /// new client will be created and returned.
142    pub async fn get(&self, node: &WorkerNode) -> Result<S> {
143        let addr = if node.get_type().unwrap() == WorkerType::Frontend {
144            let prop = node
145                .property
146                .as_ref()
147                .expect("frontend node property is missing");
148            HostAddr::from_str(prop.internal_rpc_host_addr.as_str())?
149        } else {
150            node.get_host().unwrap().into()
151        };
152
153        self.get_by_addr(addr).await
154    }
155
156    /// Gets the RPC client for the given addr. If the connection is not established, a
157    /// new client will be created and returned.
158    pub async fn get_by_addr(&self, addr: HostAddr) -> Result<S> {
159        Ok(self
160            .clients
161            .try_get_with(
162                addr.clone(),
163                S::new_clients(addr.clone(), self.connection_pool_size as usize, &self.opts),
164            )
165            .await
166            .with_context(|| format!("failed to create RPC client to {addr}"))?
167            .choose(&mut rand::rng())
168            .unwrap()
169            .clone())
170    }
171
172    pub fn invalidate_all(&self) {
173        self.clients.invalidate_all()
174    }
175}
176
177#[macro_export]
178macro_rules! stream_rpc_client_method_impl {
179    ($( { $client:tt, $fn_name:ident, $req:ty, $resp:ty }),*) => {
180        $(
181            pub async fn $fn_name(&self, request: $req) -> $crate::Result<$resp> {
182                Ok(self
183                    .$client
184                    .to_owned()
185                    .$fn_name(request)
186                    .await
187                    .map_err($crate::error::RpcError::from_stream_status)?
188                    .into_inner())
189            }
190        )*
191    }
192}
193
194#[macro_export]
195macro_rules! meta_rpc_client_method_impl {
196    ($( { $client:tt, $fn_name:ident, $req:ty, $resp:ty }),*) => {
197        $(
198            pub async fn $fn_name(&self, request: $req) -> $crate::Result<$resp> {
199                let mut client = self.core.read().await.$client.to_owned();
200                match client.$fn_name(request).await {
201                    Ok(resp) => Ok(resp.into_inner()),
202                    Err(e) => {
203                        self.refresh_client_if_needed(e.code()).await;
204                        Err($crate::error::RpcError::from_meta_status(e))
205                    }
206                }
207            }
208        )*
209    }
210}
211
212pub const DEFAULT_BUFFER_SIZE: usize = 16;
213
214pub struct BidiStreamSender<REQ> {
215    tx: Sender<REQ>,
216}
217
218impl<REQ> BidiStreamSender<REQ> {
219    pub async fn send_request<R: Into<REQ>>(&mut self, request: R) -> Result<()> {
220        self.tx
221            .send(request.into())
222            .await
223            .map_err(|_| anyhow!("unable to send request {}", type_name::<REQ>()).into())
224    }
225}
226
227pub struct BidiStreamReceiver<RSP> {
228    pub stream: Peekable<BoxStream<'static, Result<RSP>>>,
229}
230
231impl<RSP> BidiStreamReceiver<RSP> {
232    pub async fn next_response(&mut self) -> Result<RSP> {
233        self.stream
234            .next()
235            .await
236            .ok_or_else(|| anyhow!("end of response stream"))?
237    }
238}
239
240pub struct BidiStreamHandle<REQ, RSP> {
241    pub request_sender: BidiStreamSender<REQ>,
242    pub response_stream: BidiStreamReceiver<RSP>,
243}
244
245impl<REQ, RSP> Debug for BidiStreamHandle<REQ, RSP> {
246    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
247        f.write_str(type_name::<Self>())
248    }
249}
250
251impl<REQ, RSP> BidiStreamHandle<REQ, RSP> {
252    pub fn for_test(
253        request_sender: Sender<REQ>,
254        response_stream: BoxStream<'static, Result<RSP>>,
255    ) -> Self {
256        Self {
257            request_sender: BidiStreamSender { tx: request_sender },
258            response_stream: BidiStreamReceiver {
259                stream: response_stream.peekable(),
260            },
261        }
262    }
263
264    pub async fn initialize<
265        F: FnOnce(Receiver<REQ>) -> Fut,
266        St: Stream<Item = Result<RSP>> + Send + Unpin + 'static,
267        Fut: Future<Output = Result<St>> + Send,
268        R: Into<REQ>,
269    >(
270        first_request: R,
271        init_stream_fn: F,
272    ) -> Result<(Self, RSP)> {
273        let (request_sender, request_receiver) = channel(DEFAULT_BUFFER_SIZE);
274
275        // Send initial request in case of the blocking receive call from creating streaming request
276        request_sender
277            .send(first_request.into())
278            .await
279            .map_err(|_err| anyhow!("unable to send first request of {}", type_name::<REQ>()))?;
280
281        let mut response_stream = init_stream_fn(request_receiver).await?;
282
283        let first_response = response_stream
284            .next()
285            .await
286            .ok_or_else(|| anyhow!("get empty response from first request"))??;
287
288        Ok((
289            Self {
290                request_sender: BidiStreamSender { tx: request_sender },
291                response_stream: BidiStreamReceiver {
292                    stream: response_stream.boxed().peekable(),
293                },
294            },
295            first_response,
296        ))
297    }
298
299    pub async fn next_response(&mut self) -> Result<RSP> {
300        self.response_stream.next_response().await
301    }
302
303    pub async fn send_request(&mut self, request: REQ) -> Result<()> {
304        match await_future_with_monitor_error_stream(
305            &mut self.response_stream.stream,
306            self.request_sender.send_request(request),
307        )
308        .await
309        {
310            Ok(send_result) => send_result,
311            Err(None) => Err(anyhow!("end of response stream").into()),
312            Err(Some(e)) => Err(e),
313        }
314    }
315}
316
317/// The handle of a bidi-stream started from the rpc client. It is similar to the `BidiStreamHandle`
318/// except that its sender is unbounded.
319pub struct UnboundedBidiStreamHandle<REQ, RSP> {
320    pub request_sender: UnboundedSender<REQ>,
321    pub response_stream: BoxStream<'static, Result<RSP>>,
322}
323
324impl<REQ, RSP> Debug for UnboundedBidiStreamHandle<REQ, RSP> {
325    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
326        f.write_str(type_name::<Self>())
327    }
328}
329
330impl<REQ, RSP> UnboundedBidiStreamHandle<REQ, RSP> {
331    pub async fn initialize<
332        F: FnOnce(UnboundedReceiver<REQ>) -> Fut,
333        St: Stream<Item = Result<RSP>> + Send + Unpin + 'static,
334        Fut: Future<Output = Result<St>> + Send,
335        R: Into<REQ>,
336    >(
337        first_request: R,
338        init_stream_fn: F,
339    ) -> Result<(Self, RSP)> {
340        let (request_sender, request_receiver) = unbounded_channel();
341
342        // Send initial request in case of the blocking receive call from creating streaming request
343        request_sender
344            .send(first_request.into())
345            .map_err(|_err| anyhow!("unable to send first request of {}", type_name::<REQ>()))?;
346
347        let mut response_stream = init_stream_fn(request_receiver).await?;
348
349        let first_response = response_stream
350            .next()
351            .await
352            .context("get empty response from first request")??;
353
354        Ok((
355            Self {
356                request_sender,
357                response_stream: response_stream.boxed(),
358            },
359            first_response,
360        ))
361    }
362
363    pub async fn next_response(&mut self) -> Result<RSP> {
364        self.response_stream
365            .next()
366            .await
367            .ok_or_else(|| anyhow!("end of response stream"))?
368    }
369
370    pub fn send_request(&mut self, request: REQ) -> Result<()> {
371        self.request_sender
372            .send(request)
373            .map_err(|_| anyhow!("unable to send request {}", type_name::<REQ>()).into())
374    }
375}