risingwave_rpc_client/
stream_client.rs1use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use async_trait::async_trait;
20use futures::TryStreamExt;
21use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, RpcClientConfig};
22use risingwave_common::monitor::{EndpointExt, TcpConfig};
23use risingwave_common::util::addr::HostAddr;
24use risingwave_pb::stream_service::stream_service_client::StreamServiceClient;
25use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
26use risingwave_pb::stream_service::streaming_control_stream_response::InitResponse;
27use risingwave_pb::stream_service::*;
28use tokio_stream::wrappers::UnboundedReceiverStream;
29use tonic::transport::Endpoint;
30
31use crate::channel::{Channel, WrappedChannelExt};
32use crate::error::{Result, RpcError};
33use crate::{RpcClient, RpcClientPool, UnboundedBidiStreamHandle, stream_rpc_client_method_impl};
34
35#[derive(Clone)]
36pub struct StreamClient(StreamServiceClient<Channel>);
37
38#[async_trait]
39impl RpcClient for StreamClient {
40 async fn new_client(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
41 Self::new(host_addr, opts).await
42 }
43}
44
45impl StreamClient {
46 async fn new(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
47 let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
48 .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
49 .connect_timeout(Duration::from_secs(opts.connect_timeout_secs))
50 .monitored_connect(
51 "grpc-stream-client",
52 TcpConfig {
53 tcp_nodelay: true,
54 ..Default::default()
55 },
56 )
57 .await?
58 .wrapped();
59
60 Ok(Self(
61 StreamServiceClient::new(channel).max_decoding_message_size(usize::MAX),
62 ))
63 }
64}
65
66pub type StreamClientPool = RpcClientPool<StreamClient>;
67pub type StreamClientPoolRef = Arc<StreamClientPool>;
68
69macro_rules! for_all_stream_rpc {
70 ($macro:ident) => {
71 $macro! {
72 { 0, get_min_uncommitted_sst_id, GetMinUncommittedSstIdRequest, GetMinUncommittedSstIdResponse }
73 }
74 };
75}
76
77impl StreamClient {
78 for_all_stream_rpc! { stream_rpc_client_method_impl }
79}
80
81pub type StreamingControlHandle =
82 UnboundedBidiStreamHandle<StreamingControlStreamRequest, StreamingControlStreamResponse>;
83
84impl StreamClient {
85 pub async fn start_streaming_control(
86 &self,
87 init_request: PbInitRequest,
88 ) -> Result<StreamingControlHandle> {
89 let first_request = StreamingControlStreamRequest {
90 request: Some(streaming_control_stream_request::Request::Init(
91 init_request,
92 )),
93 };
94 let mut client = self.0.to_owned();
95 let (handle, first_rsp) =
96 UnboundedBidiStreamHandle::initialize(first_request, |rx| async move {
97 client
98 .streaming_control_stream(UnboundedReceiverStream::new(rx))
99 .await
100 .map(|response| response.into_inner().map_err(RpcError::from_stream_status))
101 .map_err(RpcError::from_stream_status)
102 })
103 .await?;
104 match first_rsp {
105 StreamingControlStreamResponse {
106 response: Some(streaming_control_stream_response::Response::Init(InitResponse {})),
107 } => {}
108 other => {
109 return Err(anyhow!("expect InitResponse but get {:?}", other).into());
110 }
111 };
112 Ok(handle)
113 }
114}