1#![feature(trait_alias)]
19#![feature(type_alias_impl_trait)]
20#![feature(associated_type_defaults)]
21#![feature(coroutines)]
22#![feature(iterator_try_collect)]
23#![feature(try_blocks)]
24#![feature(impl_trait_in_assoc_type)]
25#![feature(error_generic_member_access)]
26#![feature(panic_update_hook)]
27#![feature(negative_impls)]
28
29use std::any::type_name;
30use std::fmt::{Debug, Formatter};
31use std::future::Future;
32use std::str::FromStr;
33use std::sync::Arc;
34
35use anyhow::{Context, anyhow};
36use async_trait::async_trait;
37pub use compactor_client::{CompactorClient, GrpcCompactorProxyClient};
38pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef};
39pub use connector_client::{SinkCoordinatorStreamHandle, SinkWriterStreamHandle};
40use error::Result;
41pub use frontend_client::{FrontendClientPool, FrontendClientPoolRef};
42use futures::future::try_join_all;
43use futures::stream::{BoxStream, Peekable, TryStreamExt};
44use futures::{Stream, StreamExt};
45pub use hummock_meta_client::{
46 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
47 IcebergCompactionEventItem,
48};
49pub use meta_client::{MetaClient, SinkCoordinationRpcClient};
50use moka::future::Cache;
51pub use monitor_client::{MonitorClient, MonitorClientPool, MonitorClientPoolRef};
52use rand::prelude::IndexedRandom;
53use risingwave_common::config::RpcClientConfig;
54use risingwave_common::util::addr::HostAddr;
55use risingwave_pb::common::{WorkerNode, WorkerType};
56use rw_futures_util::await_future_with_monitor_error_stream;
57pub use sink_coordinate_client::CoordinatorStreamHandle;
58pub use stream_client::{
59 StreamClient, StreamClientPool, StreamClientPoolRef, StreamingControlHandle,
60};
61use tokio::sync::mpsc::{
62 Receiver, Sender, UnboundedReceiver, UnboundedSender, channel, unbounded_channel,
63};
64
65pub mod error;
66
67mod channel;
68mod compactor_client;
69mod compute_client;
70mod connector_client;
71mod frontend_client;
72mod hummock_meta_client;
73mod meta_client;
74mod monitor_client;
75mod sink_coordinate_client;
76mod stream_client;
77
78#[async_trait]
79pub trait RpcClient: Send + Sync + 'static + Clone {
80 async fn new_client(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self>;
81
82 async fn new_clients(
83 host_addr: HostAddr,
84 size: usize,
85 opts: &RpcClientConfig,
86 ) -> Result<Arc<Vec<Self>>> {
87 let make_clients = || {
88 std::iter::repeat_n(host_addr.clone(), size)
89 .map(|host_addr| Self::new_client(host_addr, opts))
90 };
91 let concurrency = opts.pool_setup_concurrency;
92 if concurrency == 0 || concurrency >= size {
93 try_join_all(make_clients()).await.map(Arc::new)
94 } else {
95 futures::stream::iter(make_clients())
96 .buffer_unordered(concurrency)
97 .try_collect::<Vec<_>>()
98 .await
99 .map(Arc::new)
100 }
101 }
102}
103
104#[derive(Clone)]
105pub struct RpcClientPool<S> {
106 connection_pool_size: u16,
107
108 clients: Cache<HostAddr, Arc<Vec<S>>>,
109
110 opts: RpcClientConfig,
111}
112
113impl<S> std::fmt::Debug for RpcClientPool<S> {
114 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115 f.debug_struct("RpcClientPool")
116 .field("connection_pool_size", &self.connection_pool_size)
117 .field("type", &type_name::<S>())
118 .field("len", &self.clients.entry_count())
119 .finish()
120 }
121}
122
123impl<S> !Default for RpcClientPool<S> {}
125
126impl<S> RpcClientPool<S>
127where
128 S: RpcClient,
129{
130 pub fn new(connection_pool_size: u16, opts: RpcClientConfig) -> Self {
133 Self {
134 connection_pool_size,
135 clients: Cache::new(u64::MAX),
136 opts,
137 }
138 }
139
140 pub fn for_test() -> Self {
142 Self::adhoc()
143 }
144
145 pub fn adhoc() -> Self {
147 Self::new(1, RpcClientConfig::default())
148 }
149
150 pub async fn get(&self, node: &WorkerNode) -> Result<S> {
153 let addr = if node.get_type().unwrap() == WorkerType::Frontend {
154 let prop = node
155 .property
156 .as_ref()
157 .expect("frontend node property is missing");
158 HostAddr::from_str(prop.internal_rpc_host_addr.as_str())?
159 } else {
160 node.get_host().unwrap().into()
161 };
162
163 self.get_by_addr(addr).await
164 }
165
166 pub async fn get_by_addr(&self, addr: HostAddr) -> Result<S> {
169 Ok(self
170 .clients
171 .try_get_with(
172 addr.clone(),
173 S::new_clients(addr.clone(), self.connection_pool_size as usize, &self.opts),
174 )
175 .await
176 .with_context(|| format!("failed to create RPC client to {addr}"))?
177 .choose(&mut rand::rng())
178 .unwrap()
179 .clone())
180 }
181
182 pub fn invalidate_all(&self) {
183 self.clients.invalidate_all()
184 }
185}
186
187#[macro_export]
188macro_rules! stream_rpc_client_method_impl {
189 ($( { $client:tt, $fn_name:ident, $req:ty, $resp:ty }),*) => {
190 $(
191 pub async fn $fn_name(&self, request: $req) -> $crate::Result<$resp> {
192 Ok(self
193 .$client
194 .to_owned()
195 .$fn_name(request)
196 .await
197 .map_err($crate::error::RpcError::from_stream_status)?
198 .into_inner())
199 }
200 )*
201 }
202}
203
204#[macro_export]
205macro_rules! meta_rpc_client_method_impl {
206 ($( { $client:tt, $fn_name:ident, $req:ty, $resp:ty }),*) => {
207 $(
208 pub async fn $fn_name(&self, request: $req) -> $crate::Result<$resp> {
209 let mut client = self.core.read().await.$client.to_owned();
210 match client.$fn_name(request).await {
211 Ok(resp) => Ok(resp.into_inner()),
212 Err(e) => {
213 self.refresh_client_if_needed(e.code()).await;
214 Err($crate::error::RpcError::from_meta_status(e))
215 }
216 }
217 }
218 )*
219 }
220}
221
222pub const DEFAULT_BUFFER_SIZE: usize = 16;
223
224pub struct BidiStreamSender<REQ> {
225 tx: Sender<REQ>,
226}
227
228impl<REQ> BidiStreamSender<REQ> {
229 pub async fn send_request<R: Into<REQ>>(&mut self, request: R) -> Result<()> {
230 self.tx
231 .send(request.into())
232 .await
233 .map_err(|_| anyhow!("unable to send request {}", type_name::<REQ>()).into())
234 }
235}
236
237pub struct BidiStreamReceiver<RSP> {
238 pub stream: Peekable<BoxStream<'static, Result<RSP>>>,
239}
240
241impl<RSP> BidiStreamReceiver<RSP> {
242 pub async fn next_response(&mut self) -> Result<RSP> {
243 self.stream
244 .next()
245 .await
246 .ok_or_else(|| anyhow!("end of response stream"))?
247 }
248}
249
250pub struct BidiStreamHandle<REQ, RSP> {
251 pub request_sender: BidiStreamSender<REQ>,
252 pub response_stream: BidiStreamReceiver<RSP>,
253}
254
255impl<REQ, RSP> Debug for BidiStreamHandle<REQ, RSP> {
256 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
257 f.write_str(type_name::<Self>())
258 }
259}
260
261impl<REQ, RSP> BidiStreamHandle<REQ, RSP> {
262 pub fn for_test(
263 request_sender: Sender<REQ>,
264 response_stream: BoxStream<'static, Result<RSP>>,
265 ) -> Self {
266 Self {
267 request_sender: BidiStreamSender { tx: request_sender },
268 response_stream: BidiStreamReceiver {
269 stream: response_stream.peekable(),
270 },
271 }
272 }
273
274 pub async fn initialize<
275 F: FnOnce(Receiver<REQ>) -> Fut,
276 St: Stream<Item = Result<RSP>> + Send + Unpin + 'static,
277 Fut: Future<Output = Result<St>> + Send,
278 R: Into<REQ>,
279 >(
280 first_request: R,
281 init_stream_fn: F,
282 ) -> Result<(Self, RSP)> {
283 let (request_sender, request_receiver) = channel(DEFAULT_BUFFER_SIZE);
284
285 request_sender
287 .send(first_request.into())
288 .await
289 .map_err(|_err| anyhow!("unable to send first request of {}", type_name::<REQ>()))?;
290
291 let mut response_stream = init_stream_fn(request_receiver).await?;
292
293 let first_response = response_stream
294 .next()
295 .await
296 .ok_or_else(|| anyhow!("get empty response from first request"))??;
297
298 Ok((
299 Self {
300 request_sender: BidiStreamSender { tx: request_sender },
301 response_stream: BidiStreamReceiver {
302 stream: response_stream.boxed().peekable(),
303 },
304 },
305 first_response,
306 ))
307 }
308
309 pub async fn next_response(&mut self) -> Result<RSP> {
310 self.response_stream.next_response().await
311 }
312
313 pub async fn send_request(&mut self, request: REQ) -> Result<()> {
314 match await_future_with_monitor_error_stream(
315 &mut self.response_stream.stream,
316 self.request_sender.send_request(request),
317 )
318 .await
319 {
320 Ok(send_result) => send_result,
321 Err(None) => Err(anyhow!("end of response stream").into()),
322 Err(Some(e)) => Err(e),
323 }
324 }
325}
326
327pub struct UnboundedBidiStreamHandle<REQ, RSP> {
330 pub request_sender: UnboundedSender<REQ>,
331 pub response_stream: BoxStream<'static, Result<RSP>>,
332}
333
334impl<REQ, RSP> Debug for UnboundedBidiStreamHandle<REQ, RSP> {
335 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
336 f.write_str(type_name::<Self>())
337 }
338}
339
340impl<REQ, RSP> UnboundedBidiStreamHandle<REQ, RSP> {
341 pub async fn initialize<
342 F: FnOnce(UnboundedReceiver<REQ>) -> Fut,
343 St: Stream<Item = Result<RSP>> + Send + Unpin + 'static,
344 Fut: Future<Output = Result<St>> + Send,
345 R: Into<REQ>,
346 >(
347 first_request: R,
348 init_stream_fn: F,
349 ) -> Result<(Self, RSP)> {
350 let (request_sender, request_receiver) = unbounded_channel();
351
352 request_sender
354 .send(first_request.into())
355 .map_err(|_err| anyhow!("unable to send first request of {}", type_name::<REQ>()))?;
356
357 let mut response_stream = init_stream_fn(request_receiver).await?;
358
359 let first_response = response_stream
360 .next()
361 .await
362 .context("get empty response from first request")??;
363
364 Ok((
365 Self {
366 request_sender,
367 response_stream: response_stream.boxed(),
368 },
369 first_response,
370 ))
371 }
372
373 pub async fn next_response(&mut self) -> Result<RSP> {
374 self.response_stream
375 .next()
376 .await
377 .ok_or_else(|| anyhow!("end of response stream"))?
378 }
379
380 pub fn send_request(&mut self, request: REQ) -> Result<()> {
381 self.request_sender
382 .send(request)
383 .map_err(|_| anyhow!("unable to send request {}", type_name::<REQ>()).into())
384 }
385}