1#![feature(trait_alias)]
19#![feature(type_alias_impl_trait)]
20#![feature(associated_type_defaults)]
21#![feature(coroutines)]
22#![feature(iterator_try_collect)]
23#![feature(try_blocks)]
24#![feature(let_chains)]
25#![feature(impl_trait_in_assoc_type)]
26#![feature(error_generic_member_access)]
27#![feature(panic_update_hook)]
28#![feature(negative_impls)]
29
30use std::any::type_name;
31use std::fmt::{Debug, Formatter};
32use std::future::Future;
33use std::str::FromStr;
34use std::sync::Arc;
35
36use anyhow::{Context, anyhow};
37use async_trait::async_trait;
38pub use compactor_client::{CompactorClient, GrpcCompactorProxyClient};
39pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef};
40pub use connector_client::{SinkCoordinatorStreamHandle, SinkWriterStreamHandle};
41use error::Result;
42pub use frontend_client::{FrontendClientPool, FrontendClientPoolRef};
43use futures::future::try_join_all;
44use futures::stream::{BoxStream, Peekable};
45use futures::{Stream, StreamExt};
46pub use hummock_meta_client::{
47 CompactionEventItem, HummockMetaClient, HummockMetaClientChangeLogInfo,
48 IcebergCompactionEventItem,
49};
50pub use meta_client::{MetaClient, SinkCoordinationRpcClient};
51use moka::future::Cache;
52use rand::prelude::IndexedRandom;
53use risingwave_common::config::RpcClientConfig;
54use risingwave_common::util::addr::HostAddr;
55use risingwave_pb::common::{WorkerNode, WorkerType};
56use rw_futures_util::await_future_with_monitor_error_stream;
57pub use sink_coordinate_client::CoordinatorStreamHandle;
58pub use stream_client::{
59 StreamClient, StreamClientPool, StreamClientPoolRef, StreamingControlHandle,
60};
61use tokio::sync::mpsc::{
62 Receiver, Sender, UnboundedReceiver, UnboundedSender, channel, unbounded_channel,
63};
64
65pub mod error;
66
67mod channel;
68mod compactor_client;
69mod compute_client;
70mod connector_client;
71mod frontend_client;
72mod hummock_meta_client;
73mod meta_client;
74mod sink_coordinate_client;
75mod stream_client;
76
77#[async_trait]
78pub trait RpcClient: Send + Sync + 'static + Clone {
79 async fn new_client(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self>;
80
81 async fn new_clients(
82 host_addr: HostAddr,
83 size: usize,
84 opts: &RpcClientConfig,
85 ) -> Result<Arc<Vec<Self>>> {
86 try_join_all(
87 std::iter::repeat_n(host_addr, size).map(|host_addr| Self::new_client(host_addr, opts)),
88 )
89 .await
90 .map(Arc::new)
91 }
92}
93
94#[derive(Clone)]
95pub struct RpcClientPool<S> {
96 connection_pool_size: u16,
97
98 clients: Cache<HostAddr, Arc<Vec<S>>>,
99
100 opts: RpcClientConfig,
101}
102
103impl<S> std::fmt::Debug for RpcClientPool<S> {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 f.debug_struct("RpcClientPool")
106 .field("connection_pool_size", &self.connection_pool_size)
107 .field("type", &type_name::<S>())
108 .field("len", &self.clients.entry_count())
109 .finish()
110 }
111}
112
113impl<S> !Default for RpcClientPool<S> {}
115
116impl<S> RpcClientPool<S>
117where
118 S: RpcClient,
119{
120 pub fn new(connection_pool_size: u16, opts: RpcClientConfig) -> Self {
123 Self {
124 connection_pool_size,
125 clients: Cache::new(u64::MAX),
126 opts,
127 }
128 }
129
130 pub fn for_test() -> Self {
132 Self::adhoc()
133 }
134
135 pub fn adhoc() -> Self {
137 Self::new(1, RpcClientConfig::default())
138 }
139
140 pub async fn get(&self, node: &WorkerNode) -> Result<S> {
143 let addr = if node.get_type().unwrap() == WorkerType::Frontend {
144 let prop = node
145 .property
146 .as_ref()
147 .expect("frontend node property is missing");
148 HostAddr::from_str(prop.internal_rpc_host_addr.as_str())?
149 } else {
150 node.get_host().unwrap().into()
151 };
152
153 self.get_by_addr(addr).await
154 }
155
156 pub async fn get_by_addr(&self, addr: HostAddr) -> Result<S> {
159 Ok(self
160 .clients
161 .try_get_with(
162 addr.clone(),
163 S::new_clients(addr.clone(), self.connection_pool_size as usize, &self.opts),
164 )
165 .await
166 .with_context(|| format!("failed to create RPC client to {addr}"))?
167 .choose(&mut rand::rng())
168 .unwrap()
169 .clone())
170 }
171
172 pub fn invalidate_all(&self) {
173 self.clients.invalidate_all()
174 }
175}
176
177#[macro_export]
178macro_rules! stream_rpc_client_method_impl {
179 ($( { $client:tt, $fn_name:ident, $req:ty, $resp:ty }),*) => {
180 $(
181 pub async fn $fn_name(&self, request: $req) -> $crate::Result<$resp> {
182 Ok(self
183 .$client
184 .to_owned()
185 .$fn_name(request)
186 .await
187 .map_err($crate::error::RpcError::from_stream_status)?
188 .into_inner())
189 }
190 )*
191 }
192}
193
194#[macro_export]
195macro_rules! meta_rpc_client_method_impl {
196 ($( { $client:tt, $fn_name:ident, $req:ty, $resp:ty }),*) => {
197 $(
198 pub async fn $fn_name(&self, request: $req) -> $crate::Result<$resp> {
199 let mut client = self.core.read().await.$client.to_owned();
200 match client.$fn_name(request).await {
201 Ok(resp) => Ok(resp.into_inner()),
202 Err(e) => {
203 self.refresh_client_if_needed(e.code()).await;
204 Err($crate::error::RpcError::from_meta_status(e))
205 }
206 }
207 }
208 )*
209 }
210}
211
212pub const DEFAULT_BUFFER_SIZE: usize = 16;
213
214pub struct BidiStreamSender<REQ> {
215 tx: Sender<REQ>,
216}
217
218impl<REQ> BidiStreamSender<REQ> {
219 pub async fn send_request<R: Into<REQ>>(&mut self, request: R) -> Result<()> {
220 self.tx
221 .send(request.into())
222 .await
223 .map_err(|_| anyhow!("unable to send request {}", type_name::<REQ>()).into())
224 }
225}
226
227pub struct BidiStreamReceiver<RSP> {
228 pub stream: Peekable<BoxStream<'static, Result<RSP>>>,
229}
230
231impl<RSP> BidiStreamReceiver<RSP> {
232 pub async fn next_response(&mut self) -> Result<RSP> {
233 self.stream
234 .next()
235 .await
236 .ok_or_else(|| anyhow!("end of response stream"))?
237 }
238}
239
240pub struct BidiStreamHandle<REQ, RSP> {
241 pub request_sender: BidiStreamSender<REQ>,
242 pub response_stream: BidiStreamReceiver<RSP>,
243}
244
245impl<REQ, RSP> Debug for BidiStreamHandle<REQ, RSP> {
246 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
247 f.write_str(type_name::<Self>())
248 }
249}
250
251impl<REQ, RSP> BidiStreamHandle<REQ, RSP> {
252 pub fn for_test(
253 request_sender: Sender<REQ>,
254 response_stream: BoxStream<'static, Result<RSP>>,
255 ) -> Self {
256 Self {
257 request_sender: BidiStreamSender { tx: request_sender },
258 response_stream: BidiStreamReceiver {
259 stream: response_stream.peekable(),
260 },
261 }
262 }
263
264 pub async fn initialize<
265 F: FnOnce(Receiver<REQ>) -> Fut,
266 St: Stream<Item = Result<RSP>> + Send + Unpin + 'static,
267 Fut: Future<Output = Result<St>> + Send,
268 R: Into<REQ>,
269 >(
270 first_request: R,
271 init_stream_fn: F,
272 ) -> Result<(Self, RSP)> {
273 let (request_sender, request_receiver) = channel(DEFAULT_BUFFER_SIZE);
274
275 request_sender
277 .send(first_request.into())
278 .await
279 .map_err(|_err| anyhow!("unable to send first request of {}", type_name::<REQ>()))?;
280
281 let mut response_stream = init_stream_fn(request_receiver).await?;
282
283 let first_response = response_stream
284 .next()
285 .await
286 .ok_or_else(|| anyhow!("get empty response from first request"))??;
287
288 Ok((
289 Self {
290 request_sender: BidiStreamSender { tx: request_sender },
291 response_stream: BidiStreamReceiver {
292 stream: response_stream.boxed().peekable(),
293 },
294 },
295 first_response,
296 ))
297 }
298
299 pub async fn next_response(&mut self) -> Result<RSP> {
300 self.response_stream.next_response().await
301 }
302
303 pub async fn send_request(&mut self, request: REQ) -> Result<()> {
304 match await_future_with_monitor_error_stream(
305 &mut self.response_stream.stream,
306 self.request_sender.send_request(request),
307 )
308 .await
309 {
310 Ok(send_result) => send_result,
311 Err(None) => Err(anyhow!("end of response stream").into()),
312 Err(Some(e)) => Err(e),
313 }
314 }
315}
316
317pub struct UnboundedBidiStreamHandle<REQ, RSP> {
320 pub request_sender: UnboundedSender<REQ>,
321 pub response_stream: BoxStream<'static, Result<RSP>>,
322}
323
324impl<REQ, RSP> Debug for UnboundedBidiStreamHandle<REQ, RSP> {
325 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
326 f.write_str(type_name::<Self>())
327 }
328}
329
330impl<REQ, RSP> UnboundedBidiStreamHandle<REQ, RSP> {
331 pub async fn initialize<
332 F: FnOnce(UnboundedReceiver<REQ>) -> Fut,
333 St: Stream<Item = Result<RSP>> + Send + Unpin + 'static,
334 Fut: Future<Output = Result<St>> + Send,
335 R: Into<REQ>,
336 >(
337 first_request: R,
338 init_stream_fn: F,
339 ) -> Result<(Self, RSP)> {
340 let (request_sender, request_receiver) = unbounded_channel();
341
342 request_sender
344 .send(first_request.into())
345 .map_err(|_err| anyhow!("unable to send first request of {}", type_name::<REQ>()))?;
346
347 let mut response_stream = init_stream_fn(request_receiver).await?;
348
349 let first_response = response_stream
350 .next()
351 .await
352 .context("get empty response from first request")??;
353
354 Ok((
355 Self {
356 request_sender,
357 response_stream: response_stream.boxed(),
358 },
359 first_response,
360 ))
361 }
362
363 pub async fn next_response(&mut self) -> Result<RSP> {
364 self.response_stream
365 .next()
366 .await
367 .ok_or_else(|| anyhow!("end of response stream"))?
368 }
369
370 pub fn send_request(&mut self, request: REQ) -> Result<()> {
371 self.request_sender
372 .send(request)
373 .map_err(|_| anyhow!("unable to send request {}", type_name::<REQ>()).into())
374 }
375}