risingwave_compute/rpc/service/
stream_service.rs1use futures::{Stream, StreamExt, TryStreamExt};
16use risingwave_pb::stream_service::stream_service_server::StreamService;
17use risingwave_pb::stream_service::*;
18use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
19use tokio::sync::mpsc::unbounded_channel;
20use tokio_stream::wrappers::UnboundedReceiverStream;
21use tonic::{Request, Response, Status, Streaming};
22
23#[derive(Clone)]
24pub struct StreamServiceImpl {
25 pub mgr: LocalStreamManager,
26 pub env: StreamEnvironment,
27}
28
29impl StreamServiceImpl {
30 pub fn new(mgr: LocalStreamManager, env: StreamEnvironment) -> Self {
31 StreamServiceImpl { mgr, env }
32 }
33}
34
35#[async_trait::async_trait]
36impl StreamService for StreamServiceImpl {
37 type StreamingControlStreamStream =
38 impl Stream<Item = std::result::Result<StreamingControlStreamResponse, tonic::Status>>;
39
40 async fn streaming_control_stream(
41 &self,
42 request: Request<Streaming<StreamingControlStreamRequest>>,
43 ) -> Result<Response<Self::StreamingControlStreamStream>, Status> {
44 let mut stream = request.into_inner().boxed();
45 let first_request = stream.try_next().await?;
46 let Some(StreamingControlStreamRequest {
47 request: Some(streaming_control_stream_request::Request::Init(init_request)),
48 }) = first_request
49 else {
50 return Err(Status::invalid_argument(format!(
51 "unexpected first request: {:?}",
52 first_request
53 )));
54 };
55 let (tx, rx) = unbounded_channel();
56 self.mgr.handle_new_control_stream(tx, stream, init_request);
57 Ok(Response::new(UnboundedReceiverStream::new(rx)))
58 }
59
60 async fn get_min_uncommitted_object_id(
61 &self,
62 _request: Request<GetMinUncommittedObjectIdRequest>,
63 ) -> Result<Response<GetMinUncommittedObjectIdResponse>, Status> {
64 let min_uncommitted_object_id =
65 if let Some(hummock) = self.mgr.env.state_store().as_hummock() {
66 hummock
67 .min_uncommitted_object_id()
68 .await
69 .map(|object_id| object_id.inner())
70 .unwrap_or(u64::MAX)
71 } else {
72 u64::MAX
73 };
74 Ok(Response::new(GetMinUncommittedObjectIdResponse {
75 min_uncommitted_object_id,
76 }))
77 }
78}