risingwave_meta_service/
event_log_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_meta::manager::event_log::EventLogManagerRef;
16use risingwave_pb::meta::event_log_service_server::EventLogService;
17use risingwave_pb::meta::{
18    AddEventLogRequest, AddEventLogResponse, ListEventLogRequest, ListEventLogResponse,
19};
20use tonic::{Request, Response, Status};
21
22pub struct EventLogServiceImpl {
23    event_log_manager: EventLogManagerRef,
24}
25
26impl EventLogServiceImpl {
27    pub fn new(event_log_manager: EventLogManagerRef) -> Self {
28        Self { event_log_manager }
29    }
30}
31
32#[async_trait::async_trait]
33impl EventLogService for EventLogServiceImpl {
34    async fn list_event_log(
35        &self,
36        _request: Request<ListEventLogRequest>,
37    ) -> Result<Response<ListEventLogResponse>, Status> {
38        let event_logs = self.event_log_manager.list_event_logs();
39        Ok(Response::new(ListEventLogResponse { event_logs }))
40    }
41
42    async fn add_event_log(
43        &self,
44        request: Request<AddEventLogRequest>,
45    ) -> Result<Response<AddEventLogResponse>, Status> {
46        let Some(event) = request.into_inner().event else {
47            return Ok(Response::new(AddEventLogResponse {}));
48        };
49        let e = match event {
50            risingwave_pb::meta::add_event_log_request::Event::WorkerNodePanic(e) => {
51                risingwave_pb::meta::event_log::Event::WorkerNodePanic(e)
52            }
53            risingwave_pb::meta::add_event_log_request::Event::SinkFail(e) => {
54                risingwave_pb::meta::event_log::Event::SinkFail(e)
55            }
56        };
57        self.event_log_manager.add_event_logs(vec![e]);
58        Ok(Response::new(AddEventLogResponse {}))
59    }
60}