risingwave_rpc_client/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Wrapper gRPC clients, which help constructing the request and destructing the
16//! response gRPC message structs.
17
18#![feature(trait_alias)]
19#![feature(type_alias_impl_trait)]
20#![feature(associated_type_defaults)]
21#![feature(coroutines)]
22#![feature(iterator_try_collect)]
23#![feature(try_blocks)]
24#![feature(let_chains)]
25#![feature(impl_trait_in_assoc_type)]
26#![feature(error_generic_member_access)]
27#![feature(panic_update_hook)]
28#![feature(negative_impls)]
29
30use std::any::type_name;
31use std::fmt::{Debug, Formatter};
32use std::future::Future;
33use std::str::FromStr;
34use std::sync::Arc;
35
36use anyhow::{Context, anyhow};
37use async_trait::async_trait;
38pub use compactor_client::{CompactorClient, GrpcCompactorProxyClient};
39pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef};
40pub use connector_client::{SinkCoordinatorStreamHandle, SinkWriterStreamHandle};
41use error::Result;
42pub use frontend_client::{FrontendClientPool, FrontendClientPoolRef};
43use futures::future::try_join_all;
44use futures::stream::{BoxStream, Peekable};
45use futures::{Stream, StreamExt};
46pub use hummock_meta_client::{
47    CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
48};
49pub use meta_client::{MetaClient, SinkCoordinationRpcClient};
50use moka::future::Cache;
51use rand::prelude::IndexedRandom;
52use risingwave_common::config::RpcClientConfig;
53use risingwave_common::util::addr::HostAddr;
54use risingwave_pb::common::{WorkerNode, WorkerType};
55use rw_futures_util::await_future_with_monitor_error_stream;
56pub use sink_coordinate_client::CoordinatorStreamHandle;
57pub use stream_client::{
58    StreamClient, StreamClientPool, StreamClientPoolRef, StreamingControlHandle,
59};
60use tokio::sync::mpsc::{
61    Receiver, Sender, UnboundedReceiver, UnboundedSender, channel, unbounded_channel,
62};
63
64pub mod error;
65
66mod channel;
67mod compactor_client;
68mod compute_client;
69mod connector_client;
70mod frontend_client;
71mod hummock_meta_client;
72mod meta_client;
73mod sink_coordinate_client;
74mod stream_client;
75
76#[async_trait]
77pub trait RpcClient: Send + Sync + 'static + Clone {
78    async fn new_client(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self>;
79
80    async fn new_clients(
81        host_addr: HostAddr,
82        size: usize,
83        opts: &RpcClientConfig,
84    ) -> Result<Arc<Vec<Self>>> {
85        try_join_all(
86            std::iter::repeat_n(host_addr, size).map(|host_addr| Self::new_client(host_addr, opts)),
87        )
88        .await
89        .map(Arc::new)
90    }
91}
92
93#[derive(Clone)]
94pub struct RpcClientPool<S> {
95    connection_pool_size: u16,
96
97    clients: Cache<HostAddr, Arc<Vec<S>>>,
98
99    opts: RpcClientConfig,
100}
101
102impl<S> std::fmt::Debug for RpcClientPool<S> {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        f.debug_struct("RpcClientPool")
105            .field("connection_pool_size", &self.connection_pool_size)
106            .field("type", &type_name::<S>())
107            .field("len", &self.clients.entry_count())
108            .finish()
109    }
110}
111
112/// Intentionally not implementing `Default` to let callers be explicit about the pool size.
113impl<S> !Default for RpcClientPool<S> {}
114
115impl<S> RpcClientPool<S>
116where
117    S: RpcClient,
118{
119    /// Create a new pool with the given `connection_pool_size`, which is the number of
120    /// connections to each node that will be reused.
121    pub fn new(connection_pool_size: u16, opts: RpcClientConfig) -> Self {
122        Self {
123            connection_pool_size,
124            clients: Cache::new(u64::MAX),
125            opts,
126        }
127    }
128
129    /// Create a pool for testing purposes. Same as [`Self::adhoc`].
130    pub fn for_test() -> Self {
131        Self::adhoc()
132    }
133
134    /// Create a pool for ad-hoc usage, where the number of connections to each node is 1.
135    pub fn adhoc() -> Self {
136        Self::new(1, RpcClientConfig::default())
137    }
138
139    /// Gets the RPC client for the given node. If the connection is not established, a
140    /// new client will be created and returned.
141    pub async fn get(&self, node: &WorkerNode) -> Result<S> {
142        let addr = if node.get_type().unwrap() == WorkerType::Frontend {
143            let prop = node
144                .property
145                .as_ref()
146                .expect("frontend node property is missing");
147            HostAddr::from_str(prop.internal_rpc_host_addr.as_str())?
148        } else {
149            node.get_host().unwrap().into()
150        };
151
152        self.get_by_addr(addr).await
153    }
154
155    /// Gets the RPC client for the given addr. If the connection is not established, a
156    /// new client will be created and returned.
157    pub async fn get_by_addr(&self, addr: HostAddr) -> Result<S> {
158        Ok(self
159            .clients
160            .try_get_with(
161                addr.clone(),
162                S::new_clients(addr.clone(), self.connection_pool_size as usize, &self.opts),
163            )
164            .await
165            .with_context(|| format!("failed to create RPC client to {addr}"))?
166            .choose(&mut rand::rng())
167            .unwrap()
168            .clone())
169    }
170
171    pub fn invalidate_all(&self) {
172        self.clients.invalidate_all()
173    }
174}
175
176#[macro_export]
177macro_rules! stream_rpc_client_method_impl {
178    ($( { $client:tt, $fn_name:ident, $req:ty, $resp:ty }),*) => {
179        $(
180            pub async fn $fn_name(&self, request: $req) -> $crate::Result<$resp> {
181                Ok(self
182                    .$client
183                    .to_owned()
184                    .$fn_name(request)
185                    .await
186                    .map_err($crate::error::RpcError::from_stream_status)?
187                    .into_inner())
188            }
189        )*
190    }
191}
192
193#[macro_export]
194macro_rules! meta_rpc_client_method_impl {
195    ($( { $client:tt, $fn_name:ident, $req:ty, $resp:ty }),*) => {
196        $(
197            pub async fn $fn_name(&self, request: $req) -> $crate::Result<$resp> {
198                let mut client = self.core.read().await.$client.to_owned();
199                match client.$fn_name(request).await {
200                    Ok(resp) => Ok(resp.into_inner()),
201                    Err(e) => {
202                        self.refresh_client_if_needed(e.code()).await;
203                        Err($crate::error::RpcError::from_meta_status(e))
204                    }
205                }
206            }
207        )*
208    }
209}
210
211pub const DEFAULT_BUFFER_SIZE: usize = 16;
212
213pub struct BidiStreamSender<REQ> {
214    tx: Sender<REQ>,
215}
216
217impl<REQ> BidiStreamSender<REQ> {
218    pub async fn send_request<R: Into<REQ>>(&mut self, request: R) -> Result<()> {
219        self.tx
220            .send(request.into())
221            .await
222            .map_err(|_| anyhow!("unable to send request {}", type_name::<REQ>()).into())
223    }
224}
225
226pub struct BidiStreamReceiver<RSP> {
227    pub stream: Peekable<BoxStream<'static, Result<RSP>>>,
228}
229
230impl<RSP> BidiStreamReceiver<RSP> {
231    pub async fn next_response(&mut self) -> Result<RSP> {
232        self.stream
233            .next()
234            .await
235            .ok_or_else(|| anyhow!("end of response stream"))?
236    }
237}
238
239pub struct BidiStreamHandle<REQ, RSP> {
240    pub request_sender: BidiStreamSender<REQ>,
241    pub response_stream: BidiStreamReceiver<RSP>,
242}
243
244impl<REQ, RSP> Debug for BidiStreamHandle<REQ, RSP> {
245    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
246        f.write_str(type_name::<Self>())
247    }
248}
249
250impl<REQ, RSP> BidiStreamHandle<REQ, RSP> {
251    pub fn for_test(
252        request_sender: Sender<REQ>,
253        response_stream: BoxStream<'static, Result<RSP>>,
254    ) -> Self {
255        Self {
256            request_sender: BidiStreamSender { tx: request_sender },
257            response_stream: BidiStreamReceiver {
258                stream: response_stream.peekable(),
259            },
260        }
261    }
262
263    pub async fn initialize<
264        F: FnOnce(Receiver<REQ>) -> Fut,
265        St: Stream<Item = Result<RSP>> + Send + Unpin + 'static,
266        Fut: Future<Output = Result<St>> + Send,
267        R: Into<REQ>,
268    >(
269        first_request: R,
270        init_stream_fn: F,
271    ) -> Result<(Self, RSP)> {
272        let (request_sender, request_receiver) = channel(DEFAULT_BUFFER_SIZE);
273
274        // Send initial request in case of the blocking receive call from creating streaming request
275        request_sender
276            .send(first_request.into())
277            .await
278            .map_err(|_err| anyhow!("unable to send first request of {}", type_name::<REQ>()))?;
279
280        let mut response_stream = init_stream_fn(request_receiver).await?;
281
282        let first_response = response_stream
283            .next()
284            .await
285            .ok_or_else(|| anyhow!("get empty response from first request"))??;
286
287        Ok((
288            Self {
289                request_sender: BidiStreamSender { tx: request_sender },
290                response_stream: BidiStreamReceiver {
291                    stream: response_stream.boxed().peekable(),
292                },
293            },
294            first_response,
295        ))
296    }
297
298    pub async fn next_response(&mut self) -> Result<RSP> {
299        self.response_stream.next_response().await
300    }
301
302    pub async fn send_request(&mut self, request: REQ) -> Result<()> {
303        match await_future_with_monitor_error_stream(
304            &mut self.response_stream.stream,
305            self.request_sender.send_request(request),
306        )
307        .await
308        {
309            Ok(send_result) => send_result,
310            Err(None) => Err(anyhow!("end of response stream").into()),
311            Err(Some(e)) => Err(e),
312        }
313    }
314}
315
316/// The handle of a bidi-stream started from the rpc client. It is similar to the `BidiStreamHandle`
317/// except that its sender is unbounded.
318pub struct UnboundedBidiStreamHandle<REQ, RSP> {
319    pub request_sender: UnboundedSender<REQ>,
320    pub response_stream: BoxStream<'static, Result<RSP>>,
321}
322
323impl<REQ, RSP> Debug for UnboundedBidiStreamHandle<REQ, RSP> {
324    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
325        f.write_str(type_name::<Self>())
326    }
327}
328
329impl<REQ, RSP> UnboundedBidiStreamHandle<REQ, RSP> {
330    pub async fn initialize<
331        F: FnOnce(UnboundedReceiver<REQ>) -> Fut,
332        St: Stream<Item = Result<RSP>> + Send + Unpin + 'static,
333        Fut: Future<Output = Result<St>> + Send,
334        R: Into<REQ>,
335    >(
336        first_request: R,
337        init_stream_fn: F,
338    ) -> Result<(Self, RSP)> {
339        let (request_sender, request_receiver) = unbounded_channel();
340
341        // Send initial request in case of the blocking receive call from creating streaming request
342        request_sender
343            .send(first_request.into())
344            .map_err(|_err| anyhow!("unable to send first request of {}", type_name::<REQ>()))?;
345
346        let mut response_stream = init_stream_fn(request_receiver).await?;
347
348        let first_response = response_stream
349            .next()
350            .await
351            .context("get empty response from first request")??;
352
353        Ok((
354            Self {
355                request_sender,
356                response_stream: response_stream.boxed(),
357            },
358            first_response,
359        ))
360    }
361
362    pub async fn next_response(&mut self) -> Result<RSP> {
363        self.response_stream
364            .next()
365            .await
366            .ok_or_else(|| anyhow!("end of response stream"))?
367    }
368
369    pub fn send_request(&mut self, request: REQ) -> Result<()> {
370        self.request_sender
371            .send(request)
372            .map_err(|_| anyhow!("unable to send request {}", type_name::<REQ>()).into())
373    }
374}