1#![feature(trait_alias)]
19#![feature(type_alias_impl_trait)]
20#![feature(associated_type_defaults)]
21#![feature(coroutines)]
22#![feature(iterator_try_collect)]
23#![feature(try_blocks)]
24#![feature(impl_trait_in_assoc_type)]
25#![feature(error_generic_member_access)]
26#![feature(panic_update_hook)]
27#![feature(negative_impls)]
28
29use std::any::type_name;
30use std::fmt::{Debug, Formatter};
31use std::future::Future;
32use std::str::FromStr;
33use std::sync::Arc;
34
35use anyhow::{Context, anyhow};
36use async_trait::async_trait;
37pub use compactor_client::{CompactorClient, GrpcCompactorProxyClient};
38pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef};
39pub use connector_client::{SinkCoordinatorStreamHandle, SinkWriterStreamHandle};
40use error::Result;
41pub use frontend_client::{FrontendClientPool, FrontendClientPoolRef};
42use futures::future::try_join_all;
43use futures::stream::{BoxStream, Peekable};
44use futures::{Stream, StreamExt};
45pub use hummock_meta_client::{
46 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
47 IcebergCompactionEventItem,
48};
49pub use meta_client::{MetaClient, SinkCoordinationRpcClient};
50use moka::future::Cache;
51pub use monitor_client::{MonitorClient, MonitorClientPool, MonitorClientPoolRef};
52use rand::prelude::IndexedRandom;
53use risingwave_common::config::RpcClientConfig;
54use risingwave_common::util::addr::HostAddr;
55use risingwave_pb::common::{WorkerNode, WorkerType};
56use rw_futures_util::await_future_with_monitor_error_stream;
57pub use sink_coordinate_client::CoordinatorStreamHandle;
58pub use stream_client::{
59 StreamClient, StreamClientPool, StreamClientPoolRef, StreamingControlHandle,
60};
61use tokio::sync::mpsc::{
62 Receiver, Sender, UnboundedReceiver, UnboundedSender, channel, unbounded_channel,
63};
64
65pub mod error;
66
67mod channel;
68mod compactor_client;
69mod compute_client;
70mod connector_client;
71mod frontend_client;
72mod hummock_meta_client;
73mod meta_client;
74mod monitor_client;
75mod sink_coordinate_client;
76mod stream_client;
77
78#[async_trait]
79pub trait RpcClient: Send + Sync + 'static + Clone {
80 async fn new_client(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self>;
81
82 async fn new_clients(
83 host_addr: HostAddr,
84 size: usize,
85 opts: &RpcClientConfig,
86 ) -> Result<Arc<Vec<Self>>> {
87 try_join_all(
88 std::iter::repeat_n(host_addr, size).map(|host_addr| Self::new_client(host_addr, opts)),
89 )
90 .await
91 .map(Arc::new)
92 }
93}
94
95#[derive(Clone)]
96pub struct RpcClientPool<S> {
97 connection_pool_size: u16,
98
99 clients: Cache<HostAddr, Arc<Vec<S>>>,
100
101 opts: RpcClientConfig,
102}
103
104impl<S> std::fmt::Debug for RpcClientPool<S> {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 f.debug_struct("RpcClientPool")
107 .field("connection_pool_size", &self.connection_pool_size)
108 .field("type", &type_name::<S>())
109 .field("len", &self.clients.entry_count())
110 .finish()
111 }
112}
113
114impl<S> !Default for RpcClientPool<S> {}
116
117impl<S> RpcClientPool<S>
118where
119 S: RpcClient,
120{
121 pub fn new(connection_pool_size: u16, opts: RpcClientConfig) -> Self {
124 Self {
125 connection_pool_size,
126 clients: Cache::new(u64::MAX),
127 opts,
128 }
129 }
130
131 pub fn for_test() -> Self {
133 Self::adhoc()
134 }
135
136 pub fn adhoc() -> Self {
138 Self::new(1, RpcClientConfig::default())
139 }
140
141 pub async fn get(&self, node: &WorkerNode) -> Result<S> {
144 let addr = if node.get_type().unwrap() == WorkerType::Frontend {
145 let prop = node
146 .property
147 .as_ref()
148 .expect("frontend node property is missing");
149 HostAddr::from_str(prop.internal_rpc_host_addr.as_str())?
150 } else {
151 node.get_host().unwrap().into()
152 };
153
154 self.get_by_addr(addr).await
155 }
156
157 pub async fn get_by_addr(&self, addr: HostAddr) -> Result<S> {
160 Ok(self
161 .clients
162 .try_get_with(
163 addr.clone(),
164 S::new_clients(addr.clone(), self.connection_pool_size as usize, &self.opts),
165 )
166 .await
167 .with_context(|| format!("failed to create RPC client to {addr}"))?
168 .choose(&mut rand::rng())
169 .unwrap()
170 .clone())
171 }
172
173 pub fn invalidate_all(&self) {
174 self.clients.invalidate_all()
175 }
176}
177
178#[macro_export]
179macro_rules! stream_rpc_client_method_impl {
180 ($( { $client:tt, $fn_name:ident, $req:ty, $resp:ty }),*) => {
181 $(
182 pub async fn $fn_name(&self, request: $req) -> $crate::Result<$resp> {
183 Ok(self
184 .$client
185 .to_owned()
186 .$fn_name(request)
187 .await
188 .map_err($crate::error::RpcError::from_stream_status)?
189 .into_inner())
190 }
191 )*
192 }
193}
194
195#[macro_export]
196macro_rules! meta_rpc_client_method_impl {
197 ($( { $client:tt, $fn_name:ident, $req:ty, $resp:ty }),*) => {
198 $(
199 pub async fn $fn_name(&self, request: $req) -> $crate::Result<$resp> {
200 let mut client = self.core.read().await.$client.to_owned();
201 match client.$fn_name(request).await {
202 Ok(resp) => Ok(resp.into_inner()),
203 Err(e) => {
204 self.refresh_client_if_needed(e.code()).await;
205 Err($crate::error::RpcError::from_meta_status(e))
206 }
207 }
208 }
209 )*
210 }
211}
212
213pub const DEFAULT_BUFFER_SIZE: usize = 16;
214
215pub struct BidiStreamSender<REQ> {
216 tx: Sender<REQ>,
217}
218
219impl<REQ> BidiStreamSender<REQ> {
220 pub async fn send_request<R: Into<REQ>>(&mut self, request: R) -> Result<()> {
221 self.tx
222 .send(request.into())
223 .await
224 .map_err(|_| anyhow!("unable to send request {}", type_name::<REQ>()).into())
225 }
226}
227
228pub struct BidiStreamReceiver<RSP> {
229 pub stream: Peekable<BoxStream<'static, Result<RSP>>>,
230}
231
232impl<RSP> BidiStreamReceiver<RSP> {
233 pub async fn next_response(&mut self) -> Result<RSP> {
234 self.stream
235 .next()
236 .await
237 .ok_or_else(|| anyhow!("end of response stream"))?
238 }
239}
240
241pub struct BidiStreamHandle<REQ, RSP> {
242 pub request_sender: BidiStreamSender<REQ>,
243 pub response_stream: BidiStreamReceiver<RSP>,
244}
245
246impl<REQ, RSP> Debug for BidiStreamHandle<REQ, RSP> {
247 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
248 f.write_str(type_name::<Self>())
249 }
250}
251
252impl<REQ, RSP> BidiStreamHandle<REQ, RSP> {
253 pub fn for_test(
254 request_sender: Sender<REQ>,
255 response_stream: BoxStream<'static, Result<RSP>>,
256 ) -> Self {
257 Self {
258 request_sender: BidiStreamSender { tx: request_sender },
259 response_stream: BidiStreamReceiver {
260 stream: response_stream.peekable(),
261 },
262 }
263 }
264
265 pub async fn initialize<
266 F: FnOnce(Receiver<REQ>) -> Fut,
267 St: Stream<Item = Result<RSP>> + Send + Unpin + 'static,
268 Fut: Future<Output = Result<St>> + Send,
269 R: Into<REQ>,
270 >(
271 first_request: R,
272 init_stream_fn: F,
273 ) -> Result<(Self, RSP)> {
274 let (request_sender, request_receiver) = channel(DEFAULT_BUFFER_SIZE);
275
276 request_sender
278 .send(first_request.into())
279 .await
280 .map_err(|_err| anyhow!("unable to send first request of {}", type_name::<REQ>()))?;
281
282 let mut response_stream = init_stream_fn(request_receiver).await?;
283
284 let first_response = response_stream
285 .next()
286 .await
287 .ok_or_else(|| anyhow!("get empty response from first request"))??;
288
289 Ok((
290 Self {
291 request_sender: BidiStreamSender { tx: request_sender },
292 response_stream: BidiStreamReceiver {
293 stream: response_stream.boxed().peekable(),
294 },
295 },
296 first_response,
297 ))
298 }
299
300 pub async fn next_response(&mut self) -> Result<RSP> {
301 self.response_stream.next_response().await
302 }
303
304 pub async fn send_request(&mut self, request: REQ) -> Result<()> {
305 match await_future_with_monitor_error_stream(
306 &mut self.response_stream.stream,
307 self.request_sender.send_request(request),
308 )
309 .await
310 {
311 Ok(send_result) => send_result,
312 Err(None) => Err(anyhow!("end of response stream").into()),
313 Err(Some(e)) => Err(e),
314 }
315 }
316}
317
318pub struct UnboundedBidiStreamHandle<REQ, RSP> {
321 pub request_sender: UnboundedSender<REQ>,
322 pub response_stream: BoxStream<'static, Result<RSP>>,
323}
324
325impl<REQ, RSP> Debug for UnboundedBidiStreamHandle<REQ, RSP> {
326 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
327 f.write_str(type_name::<Self>())
328 }
329}
330
331impl<REQ, RSP> UnboundedBidiStreamHandle<REQ, RSP> {
332 pub async fn initialize<
333 F: FnOnce(UnboundedReceiver<REQ>) -> Fut,
334 St: Stream<Item = Result<RSP>> + Send + Unpin + 'static,
335 Fut: Future<Output = Result<St>> + Send,
336 R: Into<REQ>,
337 >(
338 first_request: R,
339 init_stream_fn: F,
340 ) -> Result<(Self, RSP)> {
341 let (request_sender, request_receiver) = unbounded_channel();
342
343 request_sender
345 .send(first_request.into())
346 .map_err(|_err| anyhow!("unable to send first request of {}", type_name::<REQ>()))?;
347
348 let mut response_stream = init_stream_fn(request_receiver).await?;
349
350 let first_response = response_stream
351 .next()
352 .await
353 .context("get empty response from first request")??;
354
355 Ok((
356 Self {
357 request_sender,
358 response_stream: response_stream.boxed(),
359 },
360 first_response,
361 ))
362 }
363
364 pub async fn next_response(&mut self) -> Result<RSP> {
365 self.response_stream
366 .next()
367 .await
368 .ok_or_else(|| anyhow!("end of response stream"))?
369 }
370
371 pub fn send_request(&mut self, request: REQ) -> Result<()> {
372 self.request_sender
373 .send(request)
374 .map_err(|_| anyhow!("unable to send request {}", type_name::<REQ>()).into())
375 }
376}