risingwave_rpc_client/
stream_client.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use async_trait::async_trait;
20use futures::TryStreamExt;
21use risingwave_common::config::{MAX_CONNECTION_WINDOW_SIZE, RpcClientConfig};
22use risingwave_common::monitor::{EndpointExt, TcpConfig};
23use risingwave_common::util::addr::HostAddr;
24use risingwave_pb::stream_service::stream_service_client::StreamServiceClient;
25use risingwave_pb::stream_service::streaming_control_stream_request::PbInitRequest;
26use risingwave_pb::stream_service::streaming_control_stream_response::InitResponse;
27use risingwave_pb::stream_service::*;
28use tokio_stream::wrappers::UnboundedReceiverStream;
29use tonic::transport::Endpoint;
30
31use crate::channel::{Channel, WrappedChannelExt};
32use crate::error::{Result, RpcError};
33use crate::{RpcClient, RpcClientPool, UnboundedBidiStreamHandle, stream_rpc_client_method_impl};
34
35#[derive(Clone)]
36pub struct StreamClient(StreamServiceClient<Channel>);
37
38#[async_trait]
39impl RpcClient for StreamClient {
40    async fn new_client(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
41        Self::new(host_addr, opts).await
42    }
43}
44
45impl StreamClient {
46    async fn new(host_addr: HostAddr, opts: &RpcClientConfig) -> Result<Self> {
47        let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
48            .initial_connection_window_size(MAX_CONNECTION_WINDOW_SIZE)
49            .connect_timeout(Duration::from_secs(opts.connect_timeout_secs))
50            .monitored_connect(
51                "grpc-stream-client",
52                TcpConfig {
53                    tcp_nodelay: true,
54                    ..Default::default()
55                },
56            )
57            .await?
58            .wrapped();
59
60        Ok(Self(
61            StreamServiceClient::new(channel).max_decoding_message_size(usize::MAX),
62        ))
63    }
64}
65
66pub type StreamClientPool = RpcClientPool<StreamClient>;
67pub type StreamClientPoolRef = Arc<StreamClientPool>;
68
69macro_rules! for_all_stream_rpc {
70    ($macro:ident) => {
71        $macro! {
72            { 0, get_min_uncommitted_sst_id, GetMinUncommittedSstIdRequest, GetMinUncommittedSstIdResponse }
73        }
74    };
75}
76
77impl StreamClient {
78    for_all_stream_rpc! { stream_rpc_client_method_impl }
79}
80
81pub type StreamingControlHandle =
82    UnboundedBidiStreamHandle<StreamingControlStreamRequest, StreamingControlStreamResponse>;
83
84impl StreamClient {
85    pub async fn start_streaming_control(
86        &self,
87        init_request: PbInitRequest,
88    ) -> Result<StreamingControlHandle> {
89        let first_request = StreamingControlStreamRequest {
90            request: Some(streaming_control_stream_request::Request::Init(
91                init_request,
92            )),
93        };
94        let mut client = self.0.to_owned();
95        let (handle, first_rsp) =
96            UnboundedBidiStreamHandle::initialize(first_request, |rx| async move {
97                client
98                    .streaming_control_stream(UnboundedReceiverStream::new(rx))
99                    .await
100                    .map(|response| response.into_inner().map_err(RpcError::from_stream_status))
101                    .map_err(RpcError::from_stream_status)
102            })
103            .await?;
104        match first_rsp {
105            StreamingControlStreamResponse {
106                response: Some(streaming_control_stream_response::Response::Init(InitResponse {})),
107            } => {}
108            other => {
109                return Err(anyhow!("expect InitResponse but get {:?}", other).into());
110            }
111        };
112        Ok(handle)
113    }
114}