risingwave_meta/manager/
event_log.rs1use std::collections::{HashMap, VecDeque};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19use parking_lot::RwLock;
20use risingwave_pb::meta::EventLog as PbEventLog;
21use risingwave_pb::meta::event_log::{Event as PbEvent, Event};
22use tokio::task::JoinHandle;
23
24pub type EventLogManagerRef = Arc<EventLogManger>;
25type EventLogSender = tokio::sync::mpsc::Sender<EventLog>;
26type ShutdownSender = tokio::sync::oneshot::Sender<()>;
27
28#[derive(Hash, PartialEq, Eq)]
34enum ChannelId {
35 EventType(u32),
36 BarrierComplete(u32),
37}
38type Channel = VecDeque<EventLog>;
39type EventStoreRef = Arc<RwLock<HashMap<ChannelId, Channel>>>;
40
41pub fn start_event_log_manager(enabled: bool, event_log_channel_max_size: u32) -> EventLogManger {
43 use futures::FutureExt;
44 const BUFFER_SIZE: usize = 1024;
45 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<EventLog>(BUFFER_SIZE);
46 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
47 let shutdown_rx_shared = shutdown_rx.shared();
48 let event_logs: EventStoreRef = Arc::new(Default::default());
49 let event_logs_shared = event_logs.clone();
50 let worker_loop = async move {
51 if !enabled {
52 return;
53 }
54 loop {
55 futures::select_biased! {
56 _ = shutdown_rx_shared.clone().fuse() => {
57 tracing::info!("event log worker is stopped");
58 return;
59 },
60 event_log = event_rx.recv().fuse() => {
61 let Some(event_log) = event_log else {
62 tracing::info!("event log worker is stopped");
63 return;
64 };
65 let mut write = event_logs_shared.write();
66 let channel_id: ChannelId = (&event_log).into();
67 let channel = write.entry(channel_id).or_default();
68 channel.push_back(event_log);
69 keep_latest_n(channel, event_log_channel_max_size as _);
71 },
72 }
73 }
74 };
75 let worker_join_handle = tokio::spawn(worker_loop);
76 EventLogManger::new(
77 event_tx,
78 (worker_join_handle, shutdown_tx),
79 enabled,
80 event_logs,
81 )
82}
83
84struct EventLog {
85 payload: PbEventLog,
86}
87
88pub struct EventLogManger {
89 event_tx: EventLogSender,
90 worker_join_handle: RwLock<Option<(JoinHandle<()>, ShutdownSender)>>,
91 enabled: bool,
92 event_logs: EventStoreRef,
93}
94
95impl EventLogManger {
96 fn new(
97 event_tx: EventLogSender,
98 worker_join_handle: (JoinHandle<()>, ShutdownSender),
99 enabled: bool,
100 event_logs: EventStoreRef,
101 ) -> Self {
102 if !enabled {
103 tracing::info!("event log is disabled");
104 }
105 Self {
106 event_tx,
107 worker_join_handle: RwLock::new(Some(worker_join_handle)),
108 enabled,
109 event_logs,
110 }
111 }
112
113 #[cfg(any(test, feature = "test"))]
114 pub fn for_test() -> Self {
115 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
116 Self {
117 event_tx,
118 worker_join_handle: Default::default(),
119 enabled: false,
120 event_logs: Arc::new(Default::default()),
121 }
122 }
123
124 pub fn take_join_handle(&self) -> Option<(JoinHandle<()>, ShutdownSender)> {
125 self.worker_join_handle.write().take()
126 }
127
128 pub fn add_event_logs(&self, events: Vec<PbEvent>) {
129 if !self.enabled {
130 return;
131 }
132 let processing_ts = SystemTime::now()
133 .duration_since(SystemTime::UNIX_EPOCH)
134 .unwrap()
135 .as_millis() as u64;
136 for event in events {
137 let event_log = EventLog {
138 payload: PbEventLog {
139 unique_id: Some(uuid::Uuid::new_v4().to_string()),
140 timestamp: Some(processing_ts),
141 event: Some(event),
142 },
143 };
144 if self.event_tx.try_send(event_log).is_err() {
146 tracing::warn!("some event logs have been dropped");
147 break;
148 }
149 }
150 }
151
152 pub fn list_event_logs(&self) -> Vec<PbEventLog> {
153 self.event_logs
154 .read()
155 .values()
156 .flat_map(|v| v.iter().map(|e| e.payload.clone()))
157 .collect()
158 }
159}
160
161fn keep_latest_n(channel: &mut Channel, max_n: usize) {
162 while channel.len() > max_n {
163 channel.pop_front();
164 }
165}
166
167impl From<&EventLog> for ChannelId {
169 fn from(value: &EventLog) -> Self {
170 match value.payload.event.as_ref().unwrap() {
171 Event::CreateStreamJobFail(_) => ChannelId::EventType(1),
172 Event::DirtyStreamJobClear(_) => ChannelId::EventType(2),
173 Event::MetaNodeStart(_) => ChannelId::EventType(3),
174 Event::BarrierComplete(event) => ChannelId::BarrierComplete(event.database_id),
175 Event::InjectBarrierFail(_) => ChannelId::EventType(5),
176 Event::CollectBarrierFail(_) => ChannelId::EventType(6),
177 Event::WorkerNodePanic(_) => ChannelId::EventType(7),
178 Event::AutoSchemaChangeFail(_) => ChannelId::EventType(8),
179 Event::SinkFail(_) => ChannelId::EventType(9),
180 Event::Recovery(_) => ChannelId::EventType(10),
181 }
182 }
183}