risingwave_compute/rpc/service/
stream_service.rs1use futures::{Stream, StreamExt, TryStreamExt};
16use risingwave_hummock_sdk::HummockSstableObjectId;
17use risingwave_pb::stream_service::stream_service_server::StreamService;
18use risingwave_pb::stream_service::*;
19use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
20use tokio::sync::mpsc::unbounded_channel;
21use tokio_stream::wrappers::UnboundedReceiverStream;
22use tonic::{Request, Response, Status, Streaming};
23
24#[derive(Clone)]
25pub struct StreamServiceImpl {
26 pub mgr: LocalStreamManager,
27 pub env: StreamEnvironment,
28}
29
30impl StreamServiceImpl {
31 pub fn new(mgr: LocalStreamManager, env: StreamEnvironment) -> Self {
32 StreamServiceImpl { mgr, env }
33 }
34}
35
36#[async_trait::async_trait]
37impl StreamService for StreamServiceImpl {
38 type StreamingControlStreamStream =
39 impl Stream<Item = std::result::Result<StreamingControlStreamResponse, tonic::Status>>;
40
41 async fn streaming_control_stream(
42 &self,
43 request: Request<Streaming<StreamingControlStreamRequest>>,
44 ) -> Result<Response<Self::StreamingControlStreamStream>, Status> {
45 let mut stream = request.into_inner().boxed();
46 let first_request = stream.try_next().await?;
47 let Some(StreamingControlStreamRequest {
48 request: Some(streaming_control_stream_request::Request::Init(init_request)),
49 }) = first_request
50 else {
51 return Err(Status::invalid_argument(format!(
52 "unexpected first request: {:?}",
53 first_request
54 )));
55 };
56 let (tx, rx) = unbounded_channel();
57 self.mgr.handle_new_control_stream(tx, stream, init_request);
58 Ok(Response::new(UnboundedReceiverStream::new(rx)))
59 }
60
61 async fn get_min_uncommitted_sst_id(
62 &self,
63 _request: Request<GetMinUncommittedSstIdRequest>,
64 ) -> Result<Response<GetMinUncommittedSstIdResponse>, Status> {
65 let min_uncommitted_sst_id = if let Some(hummock) = self.mgr.env.state_store().as_hummock()
66 {
67 hummock
68 .min_uncommitted_sst_id()
69 .await
70 .unwrap_or(HummockSstableObjectId::MAX)
71 } else {
72 HummockSstableObjectId::MAX
73 };
74 Ok(Response::new(GetMinUncommittedSstIdResponse {
75 min_uncommitted_sst_id,
76 }))
77 }
78}