risingwave_meta/manager/
event_log.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, VecDeque};
16use std::sync::Arc;
17use std::time::SystemTime;
18
19use parking_lot::RwLock;
20use risingwave_pb::meta::EventLog as PbEventLog;
21use risingwave_pb::meta::event_log::{Event as PbEvent, Event};
22use tokio::task::JoinHandle;
23
24pub type EventLogManagerRef = Arc<EventLogManger>;
25type EventLogSender = tokio::sync::mpsc::Sender<EventLog>;
26type ShutdownSender = tokio::sync::oneshot::Sender<()>;
27
28/// Channel determines expiration strategy.
29///
30/// Currently all channels apply the same one strategy: keep latest N events.
31///
32/// `BarrierComplete` is retained separately per database
33#[derive(Hash, PartialEq, Eq)]
34enum ChannelId {
35    EventType(u32),
36    BarrierComplete(u32),
37}
38type Channel = VecDeque<EventLog>;
39type EventStoreRef = Arc<RwLock<HashMap<ChannelId, Channel>>>;
40
41/// Spawns a task that's responsible for event log insertion and expiration.
42pub fn start_event_log_manager(enabled: bool, event_log_channel_max_size: u32) -> EventLogManger {
43    use futures::FutureExt;
44    const BUFFER_SIZE: usize = 1024;
45    let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<EventLog>(BUFFER_SIZE);
46    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
47    let shutdown_rx_shared = shutdown_rx.shared();
48    let event_logs: EventStoreRef = Arc::new(Default::default());
49    let event_logs_shared = event_logs.clone();
50    let worker_loop = async move {
51        if !enabled {
52            return;
53        }
54        loop {
55            futures::select_biased! {
56                _ = shutdown_rx_shared.clone().fuse() => {
57                    tracing::info!("event log worker is stopped");
58                    return;
59                },
60                event_log = event_rx.recv().fuse() => {
61                    let Some(event_log) = event_log else {
62                        tracing::info!("event log worker is stopped");
63                        return;
64                    };
65                    let mut write = event_logs_shared.write();
66                    let channel_id: ChannelId = (&event_log).into();
67                    let channel = write.entry(channel_id).or_default();
68                    channel.push_back(event_log);
69                    // Apply expiration strategies.
70                    keep_latest_n(channel, event_log_channel_max_size as _);
71                },
72            }
73        }
74    };
75    let worker_join_handle = tokio::spawn(worker_loop);
76    EventLogManger::new(
77        event_tx,
78        (worker_join_handle, shutdown_tx),
79        enabled,
80        event_logs,
81    )
82}
83
84struct EventLog {
85    payload: PbEventLog,
86}
87
88pub struct EventLogManger {
89    event_tx: EventLogSender,
90    worker_join_handle: RwLock<Option<(JoinHandle<()>, ShutdownSender)>>,
91    enabled: bool,
92    event_logs: EventStoreRef,
93}
94
95impl EventLogManger {
96    fn new(
97        event_tx: EventLogSender,
98        worker_join_handle: (JoinHandle<()>, ShutdownSender),
99        enabled: bool,
100        event_logs: EventStoreRef,
101    ) -> Self {
102        if !enabled {
103            tracing::info!("event log is disabled");
104        }
105        Self {
106            event_tx,
107            worker_join_handle: RwLock::new(Some(worker_join_handle)),
108            enabled,
109            event_logs,
110        }
111    }
112
113    #[cfg(any(test, feature = "test"))]
114    pub fn for_test() -> Self {
115        let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
116        Self {
117            event_tx,
118            worker_join_handle: Default::default(),
119            enabled: false,
120            event_logs: Arc::new(Default::default()),
121        }
122    }
123
124    pub fn take_join_handle(&self) -> Option<(JoinHandle<()>, ShutdownSender)> {
125        self.worker_join_handle.write().take()
126    }
127
128    pub fn add_event_logs(&self, events: Vec<PbEvent>) {
129        if !self.enabled {
130            return;
131        }
132        let processing_ts = SystemTime::now()
133            .duration_since(SystemTime::UNIX_EPOCH)
134            .unwrap()
135            .as_millis() as u64;
136        for event in events {
137            let event_log = EventLog {
138                payload: PbEventLog {
139                    unique_id: Some(uuid::Uuid::new_v4().to_string()),
140                    timestamp: Some(processing_ts),
141                    event: Some(event),
142                },
143            };
144            // Intentionally drop event logs if any error of buffer is full.
145            if self.event_tx.try_send(event_log).is_err() {
146                tracing::warn!("some event logs have been dropped");
147                break;
148            }
149        }
150    }
151
152    pub fn list_event_logs(&self) -> Vec<PbEventLog> {
153        self.event_logs
154            .read()
155            .values()
156            .flat_map(|v| v.iter().map(|e| e.payload.clone()))
157            .collect()
158    }
159}
160
161fn keep_latest_n(channel: &mut Channel, max_n: usize) {
162    while channel.len() > max_n {
163        channel.pop_front();
164    }
165}
166
167// TODO: avoid manual implementation
168impl From<&EventLog> for ChannelId {
169    fn from(value: &EventLog) -> Self {
170        match value.payload.event.as_ref().unwrap() {
171            Event::CreateStreamJobFail(_) => ChannelId::EventType(1),
172            Event::DirtyStreamJobClear(_) => ChannelId::EventType(2),
173            Event::MetaNodeStart(_) => ChannelId::EventType(3),
174            Event::BarrierComplete(event) => ChannelId::BarrierComplete(event.database_id),
175            Event::InjectBarrierFail(_) => ChannelId::EventType(5),
176            Event::CollectBarrierFail(_) => ChannelId::EventType(6),
177            Event::WorkerNodePanic(_) => ChannelId::EventType(7),
178            Event::AutoSchemaChangeFail(_) => ChannelId::EventType(8),
179            Event::SinkFail(_) => ChannelId::EventType(9),
180            Event::Recovery(_) => ChannelId::EventType(10),
181        }
182    }
183}