risingwave_meta/manager/
event_log.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, VecDeque};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19use parking_lot::RwLock;
20use risingwave_pb::meta::EventLog as PbEventLog;
21use risingwave_pb::meta::event_log::{Event as PbEvent, Event};
22use tokio::task::JoinHandle;
23
24pub type EventLogManagerRef = Arc<EventLogManger>;
25type EventLogSender = tokio::sync::mpsc::Sender<EventLog>;
26type ShutdownSender = tokio::sync::oneshot::Sender<()>;
27
28/// Channel determines expiration strategy.
29///
30/// Currently all channels apply the same one strategy: keep latest N events.
31///
32/// Currently each event type has its own channel.
33type ChannelId = u32;
34type Channel = VecDeque<EventLog>;
35type EventStoreRef = Arc<RwLock<HashMap<ChannelId, Channel>>>;
36
37/// Spawns a task that's responsible for event log insertion and expiration.
38pub fn start_event_log_manager(enabled: bool, event_log_channel_max_size: u32) -> EventLogManger {
39    use futures::FutureExt;
40    const BUFFER_SIZE: usize = 1024;
41    let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<EventLog>(BUFFER_SIZE);
42    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
43    let shutdown_rx_shared = shutdown_rx.shared();
44    let event_logs: EventStoreRef = Arc::new(Default::default());
45    let event_logs_shared = event_logs.clone();
46    let worker_loop = async move {
47        if !enabled {
48            return;
49        }
50        loop {
51            futures::select_biased! {
52                _ = shutdown_rx_shared.clone().fuse() => {
53                    tracing::info!("event log worker is stopped");
54                    return;
55                },
56                event_log = event_rx.recv().fuse() => {
57                    let Some(event_log) = event_log else {
58                        tracing::info!("event log worker is stopped");
59                        return;
60                    };
61                    let mut write = event_logs_shared.write();
62                    let channel_id: ChannelId = (&event_log).into();
63                    let channel = write.entry(channel_id).or_default();
64                    channel.push_back(event_log);
65                    // Apply expiration strategies.
66                    keep_latest_n(channel, event_log_channel_max_size as _);
67                },
68            }
69        }
70    };
71    let worker_join_handle = tokio::spawn(worker_loop);
72    EventLogManger::new(
73        event_tx,
74        (worker_join_handle, shutdown_tx),
75        enabled,
76        event_logs,
77    )
78}
79
80struct EventLog {
81    payload: PbEventLog,
82}
83
84pub struct EventLogManger {
85    event_tx: EventLogSender,
86    worker_join_handle: RwLock<Option<(JoinHandle<()>, ShutdownSender)>>,
87    enabled: bool,
88    event_logs: EventStoreRef,
89}
90
91impl EventLogManger {
92    fn new(
93        event_tx: EventLogSender,
94        worker_join_handle: (JoinHandle<()>, ShutdownSender),
95        enabled: bool,
96        event_logs: EventStoreRef,
97    ) -> Self {
98        if !enabled {
99            tracing::info!("event log is disabled");
100        }
101        Self {
102            event_tx,
103            worker_join_handle: RwLock::new(Some(worker_join_handle)),
104            enabled,
105            event_logs,
106        }
107    }
108
109    #[cfg(any(test, feature = "test"))]
110    pub fn for_test() -> Self {
111        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
112        Self {
113            event_tx,
114            worker_join_handle: Default::default(),
115            enabled: false,
116            event_logs: Arc::new(Default::default()),
117        }
118    }
119
120    pub fn take_join_handle(&self) -> Option<(JoinHandle<()>, ShutdownSender)> {
121        self.worker_join_handle.write().take()
122    }
123
124    pub fn add_event_logs(&self, events: Vec<PbEvent>) {
125        if !self.enabled {
126            return;
127        }
128        let processing_ts = SystemTime::now()
129            .duration_since(SystemTime::UNIX_EPOCH)
130            .unwrap()
131            .as_millis() as u64;
132        for event in events {
133            let event_log = EventLog {
134                payload: PbEventLog {
135                    unique_id: Some(uuid::Uuid::new_v4().to_string()),
136                    timestamp: Some(processing_ts),
137                    event: Some(event),
138                },
139            };
140            // Intentionally drop event logs if any error of buffer is full.
141            if self.event_tx.try_send(event_log).is_err() {
142                tracing::warn!("some event logs have been dropped");
143                break;
144            }
145        }
146    }
147
148    pub fn list_event_logs(&self) -> Vec<PbEventLog> {
149        self.event_logs
150            .read()
151            .values()
152            .flat_map(|v| v.iter().map(|e| e.payload.to_owned()))
153            .collect()
154    }
155}
156
157fn keep_latest_n(channel: &mut Channel, max_n: usize) {
158    while channel.len() > max_n {
159        channel.pop_front();
160    }
161}
162
163// TODO: avoid manual implementation
164impl From<&EventLog> for ChannelId {
165    fn from(value: &EventLog) -> Self {
166        match value.payload.event.as_ref().unwrap() {
167            Event::CreateStreamJobFail(_) => 1,
168            Event::DirtyStreamJobClear(_) => 2,
169            Event::MetaNodeStart(_) => 3,
170            Event::BarrierComplete(_) => 4,
171            Event::InjectBarrierFail(_) => 5,
172            Event::CollectBarrierFail(_) => 6,
173            Event::WorkerNodePanic(_) => 7,
174            Event::AutoSchemaChangeFail(_) => 8,
175            Event::SinkFail(_) => 9,
176            Event::Recovery(_) => 10,
177        }
178    }
179}