1#![feature(trait_alias)]
19#![feature(type_alias_impl_trait)]
20#![feature(associated_type_defaults)]
21#![feature(coroutines)]
22#![feature(iterator_try_collect)]
23#![feature(try_blocks)]
24#![feature(let_chains)]
25#![feature(impl_trait_in_assoc_type)]
26#![feature(error_generic_member_access)]
27#![feature(panic_update_hook)]
28#![feature(negative_impls)]
29
30use std::any::type_name;
31use std::fmt::{Debug, Formatter};
32use std::future::Future;
33use std::str::FromStr;
34use std::sync::Arc;
35
36use anyhow::{Context, anyhow};
37use async_trait::async_trait;
38pub use compactor_client::{CompactorClient, GrpcCompactorProxyClient};
39pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef};
40pub use connector_client::{SinkCoordinatorStreamHandle, SinkWriterStreamHandle};
41use error::Result;
42pub use frontend_client::{FrontendClientPool, FrontendClientPoolRef};
43use futures::future::try_join_all;
44use futures::stream::{BoxStream, Peekable};
45use futures::{Stream, StreamExt};
46pub use hummock_meta_client::{
47 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
48};
49pub use meta_client::{MetaClient, SinkCoordinationRpcClient};
50use moka::future::Cache;
51use rand::prelude::IndexedRandom;
52use risingwave_common::config::RpcClientConfig;
53use risingwave_common::util::addr::HostAddr;
54use risingwave_pb::common::{WorkerNode, WorkerType};
55use rw_futures_util::await_future_with_monitor_error_stream;
56pub use sink_coordinate_client::CoordinatorStreamHandle;
57pub use stream_client::{
58 StreamClient, StreamClientPool, StreamClientPoolRef, StreamingControlHandle,
59};
60use tokio::sync::mpsc::{
61 Receiver, Sender, UnboundedReceiver, UnboundedSender, channel, unbounded_channel,
62};
63
64pub mod error;
65
66mod channel;
67mod compactor_client;
68mod compute_client;
69mod connector_client;
70mod frontend_client;
71mod hummock_meta_client;
72mod meta_client;
73mod sink_coordinate_client;
74mod stream_client;
75
76#[async_trait]
77pub trait RpcClient: Send + Sync + 'static + Clone {
78 async fn new_client(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self>;
79
80 async fn new_clients(
81 host_addr: HostAddr,
82 size: usize,
83 opts: &RpcClientConfig,
84 ) -> Result<Arc<Vec<Self>>> {
85 try_join_all(
86 std::iter::repeat_n(host_addr, size).map(|host_addr| Self::new_client(host_addr, opts)),
87 )
88 .await
89 .map(Arc::new)
90 }
91}
92
93#[derive(Clone)]
94pub struct RpcClientPool<S> {
95 connection_pool_size: u16,
96
97 clients: Cache<HostAddr, Arc<Vec<S>>>,
98
99 opts: RpcClientConfig,
100}
101
102impl<S> std::fmt::Debug for RpcClientPool<S> {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 f.debug_struct("RpcClientPool")
105 .field("connection_pool_size", &self.connection_pool_size)
106 .field("type", &type_name::<S>())
107 .field("len", &self.clients.entry_count())
108 .finish()
109 }
110}
111
112impl<S> !Default for RpcClientPool<S> {}
114
115impl<S> RpcClientPool<S>
116where
117 S: RpcClient,
118{
119 pub fn new(connection_pool_size: u16, opts: RpcClientConfig) -> Self {
122 Self {
123 connection_pool_size,
124 clients: Cache::new(u64::MAX),
125 opts,
126 }
127 }
128
129 pub fn for_test() -> Self {
131 Self::adhoc()
132 }
133
134 pub fn adhoc() -> Self {
136 Self::new(1, RpcClientConfig::default())
137 }
138
139 pub async fn get(&self, node: &WorkerNode) -> Result<S> {
142 let addr = if node.get_type().unwrap() == WorkerType::Frontend {
143 let prop = node
144 .property
145 .as_ref()
146 .expect("frontend node property is missing");
147 HostAddr::from_str(prop.internal_rpc_host_addr.as_str())?
148 } else {
149 node.get_host().unwrap().into()
150 };
151
152 self.get_by_addr(addr).await
153 }
154
155 pub async fn get_by_addr(&self, addr: HostAddr) -> Result<S> {
158 Ok(self
159 .clients
160 .try_get_with(
161 addr.clone(),
162 S::new_clients(addr.clone(), self.connection_pool_size as usize, &self.opts),
163 )
164 .await
165 .with_context(|| format!("failed to create RPC client to {addr}"))?
166 .choose(&mut rand::rng())
167 .unwrap()
168 .clone())
169 }
170
171 pub fn invalidate_all(&self) {
172 self.clients.invalidate_all()
173 }
174}
175
176#[macro_export]
177macro_rules! stream_rpc_client_method_impl {
178 ($( { $client:tt, $fn_name:ident, $req:ty, $resp:ty }),*) => {
179 $(
180 pub async fn $fn_name(&self, request: $req) -> $crate::Result<$resp> {
181 Ok(self
182 .$client
183 .to_owned()
184 .$fn_name(request)
185 .await
186 .map_err($crate::error::RpcError::from_stream_status)?
187 .into_inner())
188 }
189 )*
190 }
191}
192
193#[macro_export]
194macro_rules! meta_rpc_client_method_impl {
195 ($( { $client:tt, $fn_name:ident, $req:ty, $resp:ty }),*) => {
196 $(
197 pub async fn $fn_name(&self, request: $req) -> $crate::Result<$resp> {
198 let mut client = self.core.read().await.$client.to_owned();
199 match client.$fn_name(request).await {
200 Ok(resp) => Ok(resp.into_inner()),
201 Err(e) => {
202 self.refresh_client_if_needed(e.code()).await;
203 Err($crate::error::RpcError::from_meta_status(e))
204 }
205 }
206 }
207 )*
208 }
209}
210
211pub const DEFAULT_BUFFER_SIZE: usize = 16;
212
213pub struct BidiStreamSender<REQ> {
214 tx: Sender<REQ>,
215}
216
217impl<REQ> BidiStreamSender<REQ> {
218 pub async fn send_request<R: Into<REQ>>(&mut self, request: R) -> Result<()> {
219 self.tx
220 .send(request.into())
221 .await
222 .map_err(|_| anyhow!("unable to send request {}", type_name::<REQ>()).into())
223 }
224}
225
226pub struct BidiStreamReceiver<RSP> {
227 pub stream: Peekable<BoxStream<'static, Result<RSP>>>,
228}
229
230impl<RSP> BidiStreamReceiver<RSP> {
231 pub async fn next_response(&mut self) -> Result<RSP> {
232 self.stream
233 .next()
234 .await
235 .ok_or_else(|| anyhow!("end of response stream"))?
236 }
237}
238
239pub struct BidiStreamHandle<REQ, RSP> {
240 pub request_sender: BidiStreamSender<REQ>,
241 pub response_stream: BidiStreamReceiver<RSP>,
242}
243
244impl<REQ, RSP> Debug for BidiStreamHandle<REQ, RSP> {
245 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
246 f.write_str(type_name::<Self>())
247 }
248}
249
250impl<REQ, RSP> BidiStreamHandle<REQ, RSP> {
251 pub fn for_test(
252 request_sender: Sender<REQ>,
253 response_stream: BoxStream<'static, Result<RSP>>,
254 ) -> Self {
255 Self {
256 request_sender: BidiStreamSender { tx: request_sender },
257 response_stream: BidiStreamReceiver {
258 stream: response_stream.peekable(),
259 },
260 }
261 }
262
263 pub async fn initialize<
264 F: FnOnce(Receiver<REQ>) -> Fut,
265 St: Stream<Item = Result<RSP>> + Send + Unpin + 'static,
266 Fut: Future<Output = Result<St>> + Send,
267 R: Into<REQ>,
268 >(
269 first_request: R,
270 init_stream_fn: F,
271 ) -> Result<(Self, RSP)> {
272 let (request_sender, request_receiver) = channel(DEFAULT_BUFFER_SIZE);
273
274 request_sender
276 .send(first_request.into())
277 .await
278 .map_err(|_err| anyhow!("unable to send first request of {}", type_name::<REQ>()))?;
279
280 let mut response_stream = init_stream_fn(request_receiver).await?;
281
282 let first_response = response_stream
283 .next()
284 .await
285 .ok_or_else(|| anyhow!("get empty response from first request"))??;
286
287 Ok((
288 Self {
289 request_sender: BidiStreamSender { tx: request_sender },
290 response_stream: BidiStreamReceiver {
291 stream: response_stream.boxed().peekable(),
292 },
293 },
294 first_response,
295 ))
296 }
297
298 pub async fn next_response(&mut self) -> Result<RSP> {
299 self.response_stream.next_response().await
300 }
301
302 pub async fn send_request(&mut self, request: REQ) -> Result<()> {
303 match await_future_with_monitor_error_stream(
304 &mut self.response_stream.stream,
305 self.request_sender.send_request(request),
306 )
307 .await
308 {
309 Ok(send_result) => send_result,
310 Err(None) => Err(anyhow!("end of response stream").into()),
311 Err(Some(e)) => Err(e),
312 }
313 }
314}
315
316pub struct UnboundedBidiStreamHandle<REQ, RSP> {
319 pub request_sender: UnboundedSender<REQ>,
320 pub response_stream: BoxStream<'static, Result<RSP>>,
321}
322
323impl<REQ, RSP> Debug for UnboundedBidiStreamHandle<REQ, RSP> {
324 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
325 f.write_str(type_name::<Self>())
326 }
327}
328
329impl<REQ, RSP> UnboundedBidiStreamHandle<REQ, RSP> {
330 pub async fn initialize<
331 F: FnOnce(UnboundedReceiver<REQ>) -> Fut,
332 St: Stream<Item = Result<RSP>> + Send + Unpin + 'static,
333 Fut: Future<Output = Result<St>> + Send,
334 R: Into<REQ>,
335 >(
336 first_request: R,
337 init_stream_fn: F,
338 ) -> Result<(Self, RSP)> {
339 let (request_sender, request_receiver) = unbounded_channel();
340
341 request_sender
343 .send(first_request.into())
344 .map_err(|_err| anyhow!("unable to send first request of {}", type_name::<REQ>()))?;
345
346 let mut response_stream = init_stream_fn(request_receiver).await?;
347
348 let first_response = response_stream
349 .next()
350 .await
351 .context("get empty response from first request")??;
352
353 Ok((
354 Self {
355 request_sender,
356 response_stream: response_stream.boxed(),
357 },
358 first_response,
359 ))
360 }
361
362 pub async fn next_response(&mut self) -> Result<RSP> {
363 self.response_stream
364 .next()
365 .await
366 .ok_or_else(|| anyhow!("end of response stream"))?
367 }
368
369 pub fn send_request(&mut self, request: REQ) -> Result<()> {
370 self.request_sender
371 .send(request)
372 .map_err(|_| anyhow!("unable to send request {}", type_name::<REQ>()).into())
373 }
374}