risingwave_meta_service/
sink_coordination_service.rs1use futures::{Stream, StreamExt};
16use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoordinationService;
17use risingwave_pb::connector_service::{CoordinateRequest, CoordinateResponse};
18use tonic::{Request, Response, Status, Streaming};
19
20use crate::manager::sink_coordination::SinkCoordinatorManager;
21
22#[derive(Clone)]
23pub struct SinkCoordinationServiceImpl {
24 sink_manager: SinkCoordinatorManager,
25}
26
27impl SinkCoordinationServiceImpl {
28 pub fn new(sink_manager: SinkCoordinatorManager) -> Self {
29 Self { sink_manager }
30 }
31}
32
33#[async_trait::async_trait]
34impl SinkCoordinationService for SinkCoordinationServiceImpl {
35 type CoordinateStream = impl Stream<Item = Result<CoordinateResponse, Status>>;
36
37 async fn coordinate(
38 &self,
39 request: Request<Streaming<CoordinateRequest>>,
40 ) -> Result<Response<Self::CoordinateStream>, Status> {
41 let stream = request.into_inner();
42 Ok(Response::new(
43 self.sink_manager.handle_new_request(stream.boxed()).await?,
44 ))
45 }
46}