risingwave_meta_service/
event_log_service.rs1use risingwave_meta::manager::event_log::EventLogManagerRef;
16use risingwave_pb::meta::event_log_service_server::EventLogService;
17use risingwave_pb::meta::{
18 AddEventLogRequest, AddEventLogResponse, ListEventLogRequest, ListEventLogResponse,
19};
20use tonic::{Request, Response, Status};
21
22pub struct EventLogServiceImpl {
23 event_log_manager: EventLogManagerRef,
24}
25
26impl EventLogServiceImpl {
27 pub fn new(event_log_manager: EventLogManagerRef) -> Self {
28 Self { event_log_manager }
29 }
30}
31
32#[async_trait::async_trait]
33impl EventLogService for EventLogServiceImpl {
34 async fn list_event_log(
35 &self,
36 _request: Request<ListEventLogRequest>,
37 ) -> Result<Response<ListEventLogResponse>, Status> {
38 let event_logs = self.event_log_manager.list_event_logs();
39 Ok(Response::new(ListEventLogResponse { event_logs }))
40 }
41
42 async fn add_event_log(
43 &self,
44 request: Request<AddEventLogRequest>,
45 ) -> Result<Response<AddEventLogResponse>, Status> {
46 let Some(event) = request.into_inner().event else {
47 return Ok(Response::new(AddEventLogResponse {}));
48 };
49 let e = match event {
50 risingwave_pb::meta::add_event_log_request::Event::WorkerNodePanic(e) => {
51 risingwave_pb::meta::event_log::Event::WorkerNodePanic(e)
52 }
53 risingwave_pb::meta::add_event_log_request::Event::SinkFail(e) => {
54 risingwave_pb::meta::event_log::Event::SinkFail(e)
55 }
56 };
57 self.event_log_manager.add_event_logs(vec![e]);
58 Ok(Response::new(AddEventLogResponse {}))
59 }
60}