risingwave_compute/rpc/service/
stream_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::{Stream, StreamExt, TryStreamExt};
16use risingwave_hummock_sdk::HummockSstableObjectId;
17use risingwave_pb::stream_service::stream_service_server::StreamService;
18use risingwave_pb::stream_service::*;
19use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
20use tokio::sync::mpsc::unbounded_channel;
21use tokio_stream::wrappers::UnboundedReceiverStream;
22use tonic::{Request, Response, Status, Streaming};
23
24#[derive(Clone)]
25pub struct StreamServiceImpl {
26    pub mgr: LocalStreamManager,
27    pub env: StreamEnvironment,
28}
29
30impl StreamServiceImpl {
31    pub fn new(mgr: LocalStreamManager, env: StreamEnvironment) -> Self {
32        StreamServiceImpl { mgr, env }
33    }
34}
35
36#[async_trait::async_trait]
37impl StreamService for StreamServiceImpl {
38    type StreamingControlStreamStream =
39        impl Stream<Item = std::result::Result<StreamingControlStreamResponse, tonic::Status>>;
40
41    async fn streaming_control_stream(
42        &self,
43        request: Request<Streaming<StreamingControlStreamRequest>>,
44    ) -> Result<Response<Self::StreamingControlStreamStream>, Status> {
45        let mut stream = request.into_inner().boxed();
46        let first_request = stream.try_next().await?;
47        let Some(StreamingControlStreamRequest {
48            request: Some(streaming_control_stream_request::Request::Init(init_request)),
49        }) = first_request
50        else {
51            return Err(Status::invalid_argument(format!(
52                "unexpected first request: {:?}",
53                first_request
54            )));
55        };
56        let (tx, rx) = unbounded_channel();
57        self.mgr.handle_new_control_stream(tx, stream, init_request);
58        Ok(Response::new(UnboundedReceiverStream::new(rx)))
59    }
60
61    async fn get_min_uncommitted_sst_id(
62        &self,
63        _request: Request<GetMinUncommittedSstIdRequest>,
64    ) -> Result<Response<GetMinUncommittedSstIdResponse>, Status> {
65        let min_uncommitted_sst_id = if let Some(hummock) = self.mgr.env.state_store().as_hummock()
66        {
67            hummock
68                .min_uncommitted_sst_id()
69                .await
70                .unwrap_or(HummockSstableObjectId::MAX)
71        } else {
72            HummockSstableObjectId::MAX
73        };
74        Ok(Response::new(GetMinUncommittedSstIdResponse {
75            min_uncommitted_sst_id,
76        }))
77    }
78}