risingwave_compute/rpc/service/
stream_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::{Stream, StreamExt, TryStreamExt};
16use risingwave_pb::stream_service::stream_service_server::StreamService;
17use risingwave_pb::stream_service::*;
18use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
19use tokio::sync::mpsc::unbounded_channel;
20use tokio_stream::wrappers::UnboundedReceiverStream;
21use tonic::{Request, Response, Status, Streaming};
22
23#[derive(Clone)]
24pub struct StreamServiceImpl {
25    pub mgr: LocalStreamManager,
26    pub env: StreamEnvironment,
27}
28
29impl StreamServiceImpl {
30    pub fn new(mgr: LocalStreamManager, env: StreamEnvironment) -> Self {
31        StreamServiceImpl { mgr, env }
32    }
33}
34
35#[async_trait::async_trait]
36impl StreamService for StreamServiceImpl {
37    type StreamingControlStreamStream =
38        impl Stream<Item = std::result::Result<StreamingControlStreamResponse, tonic::Status>>;
39
40    async fn streaming_control_stream(
41        &self,
42        request: Request<Streaming<StreamingControlStreamRequest>>,
43    ) -> Result<Response<Self::StreamingControlStreamStream>, Status> {
44        let mut stream = request.into_inner().boxed();
45        let first_request = stream.try_next().await?;
46        let Some(StreamingControlStreamRequest {
47            request: Some(streaming_control_stream_request::Request::Init(init_request)),
48        }) = first_request
49        else {
50            return Err(Status::invalid_argument(format!(
51                "unexpected first request: {:?}",
52                first_request
53            )));
54        };
55        let (tx, rx) = unbounded_channel();
56        self.mgr.handle_new_control_stream(tx, stream, init_request);
57        Ok(Response::new(UnboundedReceiverStream::new(rx)))
58    }
59
60    async fn get_min_uncommitted_object_id(
61        &self,
62        _request: Request<GetMinUncommittedObjectIdRequest>,
63    ) -> Result<Response<GetMinUncommittedObjectIdResponse>, Status> {
64        let min_uncommitted_object_id =
65            if let Some(hummock) = self.mgr.env.state_store().as_hummock() {
66                hummock
67                    .min_uncommitted_object_id()
68                    .await
69                    .map(|object_id| object_id.inner())
70                    .unwrap_or(u64::MAX)
71            } else {
72                u64::MAX
73            };
74        Ok(Response::new(GetMinUncommittedObjectIdResponse {
75            min_uncommitted_object_id,
76        }))
77    }
78}