risingwave_meta/manager/
event_log.rs1use std::collections::{HashMap, VecDeque};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19use parking_lot::RwLock;
20use risingwave_pb::meta::EventLog as PbEventLog;
21use risingwave_pb::meta::event_log::{Event as PbEvent, Event};
22use tokio::task::JoinHandle;
23
24pub type EventLogManagerRef = Arc<EventLogManger>;
25type EventLogSender = tokio::sync::mpsc::Sender<EventLog>;
26type ShutdownSender = tokio::sync::oneshot::Sender<()>;
27
28type ChannelId = u32;
34type Channel = VecDeque<EventLog>;
35type EventStoreRef = Arc<RwLock<HashMap<ChannelId, Channel>>>;
36
37pub fn start_event_log_manager(enabled: bool, event_log_channel_max_size: u32) -> EventLogManger {
39 use futures::FutureExt;
40 const BUFFER_SIZE: usize = 1024;
41 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<EventLog>(BUFFER_SIZE);
42 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
43 let shutdown_rx_shared = shutdown_rx.shared();
44 let event_logs: EventStoreRef = Arc::new(Default::default());
45 let event_logs_shared = event_logs.clone();
46 let worker_loop = async move {
47 if !enabled {
48 return;
49 }
50 loop {
51 futures::select_biased! {
52 _ = shutdown_rx_shared.clone().fuse() => {
53 tracing::info!("event log worker is stopped");
54 return;
55 },
56 event_log = event_rx.recv().fuse() => {
57 let Some(event_log) = event_log else {
58 tracing::info!("event log worker is stopped");
59 return;
60 };
61 let mut write = event_logs_shared.write();
62 let channel_id: ChannelId = (&event_log).into();
63 let channel = write.entry(channel_id).or_default();
64 channel.push_back(event_log);
65 keep_latest_n(channel, event_log_channel_max_size as _);
67 },
68 }
69 }
70 };
71 let worker_join_handle = tokio::spawn(worker_loop);
72 EventLogManger::new(
73 event_tx,
74 (worker_join_handle, shutdown_tx),
75 enabled,
76 event_logs,
77 )
78}
79
80struct EventLog {
81 payload: PbEventLog,
82}
83
84pub struct EventLogManger {
85 event_tx: EventLogSender,
86 worker_join_handle: RwLock<Option<(JoinHandle<()>, ShutdownSender)>>,
87 enabled: bool,
88 event_logs: EventStoreRef,
89}
90
91impl EventLogManger {
92 fn new(
93 event_tx: EventLogSender,
94 worker_join_handle: (JoinHandle<()>, ShutdownSender),
95 enabled: bool,
96 event_logs: EventStoreRef,
97 ) -> Self {
98 if !enabled {
99 tracing::info!("event log is disabled");
100 }
101 Self {
102 event_tx,
103 worker_join_handle: RwLock::new(Some(worker_join_handle)),
104 enabled,
105 event_logs,
106 }
107 }
108
109 #[cfg(any(test, feature = "test"))]
110 pub fn for_test() -> Self {
111 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
112 Self {
113 event_tx,
114 worker_join_handle: Default::default(),
115 enabled: false,
116 event_logs: Arc::new(Default::default()),
117 }
118 }
119
120 pub fn take_join_handle(&self) -> Option<(JoinHandle<()>, ShutdownSender)> {
121 self.worker_join_handle.write().take()
122 }
123
124 pub fn add_event_logs(&self, events: Vec<PbEvent>) {
125 if !self.enabled {
126 return;
127 }
128 let processing_ts = SystemTime::now()
129 .duration_since(SystemTime::UNIX_EPOCH)
130 .unwrap()
131 .as_millis() as u64;
132 for event in events {
133 let event_log = EventLog {
134 payload: PbEventLog {
135 unique_id: Some(uuid::Uuid::new_v4().to_string()),
136 timestamp: Some(processing_ts),
137 event: Some(event),
138 },
139 };
140 if self.event_tx.try_send(event_log).is_err() {
142 tracing::warn!("some event logs have been dropped");
143 break;
144 }
145 }
146 }
147
148 pub fn list_event_logs(&self) -> Vec<PbEventLog> {
149 self.event_logs
150 .read()
151 .values()
152 .flat_map(|v| v.iter().map(|e| e.payload.to_owned()))
153 .collect()
154 }
155}
156
157fn keep_latest_n(channel: &mut Channel, max_n: usize) {
158 while channel.len() > max_n {
159 channel.pop_front();
160 }
161}
162
163impl From<&EventLog> for ChannelId {
165 fn from(value: &EventLog) -> Self {
166 match value.payload.event.as_ref().unwrap() {
167 Event::CreateStreamJobFail(_) => 1,
168 Event::DirtyStreamJobClear(_) => 2,
169 Event::MetaNodeStart(_) => 3,
170 Event::BarrierComplete(_) => 4,
171 Event::InjectBarrierFail(_) => 5,
172 Event::CollectBarrierFail(_) => 6,
173 Event::WorkerNodePanic(_) => 7,
174 Event::AutoSchemaChangeFail(_) => 8,
175 Event::SinkFail(_) => 9,
176 Event::Recovery(_) => 10,
177 }
178 }
179}