risingwave_storage/hummock/event_handler/
mod.rs1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use parking_lot::{RwLock, RwLockReadGuard};
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::TableId;
22use risingwave_hummock_sdk::{HummockEpoch, HummockRawObjectId};
23use thiserror_ext::AsReport;
24use tokio::sync::oneshot;
25
26use crate::hummock::HummockResult;
27use crate::hummock::shared_buffer::shared_buffer_batch::{SharedBufferBatch, SharedBufferBatchId};
28use crate::mem_table::ImmutableMemtable;
29use crate::store::SealCurrentEpochOptions;
30
31pub mod hummock_event_handler;
32pub mod refiller;
33pub mod uploader;
34
35pub(crate) use hummock_event_handler::HummockEventHandler;
36use risingwave_hummock_sdk::vector_index::VectorIndexAdd;
37use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
38
39use super::store::version::HummockReadVersion;
40use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
41use crate::hummock::event_handler::uploader::SyncedData;
42use crate::hummock::utils::MemoryTracker;
43
44#[derive(Debug)]
45pub struct BufferWriteRequest {
46 pub batch: SharedBufferBatch,
47 pub epoch: HummockEpoch,
48 pub grant_sender: oneshot::Sender<()>,
49}
50
51#[derive(Debug)]
52pub enum HummockVersionUpdate {
53 VersionDeltas(Vec<HummockVersionDelta>),
54 PinnedVersion(Box<HummockVersion>),
55}
56
57pub enum HummockEvent {
58 BufferMayFlush,
60
61 SyncEpoch {
65 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
66 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
67 },
68
69 Clear(oneshot::Sender<()>, Option<HashSet<TableId>>),
71
72 Shutdown,
73
74 ImmToUploader {
75 instance_id: SharedBufferBatchId,
76 imms: Vec<(ImmutableMemtable, MemoryTracker)>,
77 },
78
79 StartEpoch {
80 epoch: HummockEpoch,
81 table_ids: HashSet<TableId>,
82 },
83
84 InitEpoch {
85 instance_id: LocalInstanceId,
86 init_epoch: HummockEpoch,
87 },
88
89 LocalSealEpoch {
90 instance_id: LocalInstanceId,
91 next_epoch: HummockEpoch,
92 opts: SealCurrentEpochOptions,
93 },
94
95 #[cfg(any(test, feature = "test"))]
96 FlushEvent(oneshot::Sender<()>),
99
100 RegisterReadVersion {
101 table_id: TableId,
102 new_read_version_sender: oneshot::Sender<(HummockReadVersionRef, LocalInstanceGuard)>,
103 is_replicated: bool,
104 vnodes: Arc<Bitmap>,
105 },
106
107 DestroyReadVersion {
108 instance_id: LocalInstanceId,
109 },
110
111 RegisterVectorWriter {
112 table_id: TableId,
113 init_epoch: HummockEpoch,
114 },
115
116 VectorWriterSealEpoch {
117 table_id: TableId,
118 next_epoch: HummockEpoch,
119 add: Option<VectorIndexAdd>,
120 },
121
122 DropVectorWriter {
123 table_id: TableId,
124 },
125
126 GetMinUncommittedObjectId {
127 result_tx: oneshot::Sender<Option<HummockRawObjectId>>,
128 },
129}
130
131impl HummockEvent {
132 fn to_debug_string(&self) -> String {
133 match self {
134 HummockEvent::BufferMayFlush => "BufferMayFlush".to_owned(),
135
136 HummockEvent::SyncEpoch {
137 sync_result_sender: _,
138 sync_table_epochs,
139 } => format!("AwaitSyncEpoch epoch {:?}", sync_table_epochs),
140
141 HummockEvent::Clear(_, table_ids) => {
142 format!("Clear {:?}", table_ids)
143 }
144
145 HummockEvent::Shutdown => "Shutdown".to_owned(),
146
147 HummockEvent::StartEpoch { epoch, table_ids } => {
148 format!("StartEpoch {} {:?}", epoch, table_ids)
149 }
150
151 HummockEvent::InitEpoch {
152 instance_id,
153 init_epoch,
154 } => {
155 format!("InitEpoch {} {}", instance_id, init_epoch)
156 }
157
158 HummockEvent::ImmToUploader { instance_id, imms } => {
159 format!(
160 "ImmToUploader {} {:?}",
161 instance_id,
162 imms.iter().map(|(imm, _)| imm.batch_id()).collect_vec()
163 )
164 }
165
166 HummockEvent::LocalSealEpoch {
167 instance_id,
168 next_epoch,
169 opts,
170 } => {
171 format!(
172 "LocalSealEpoch next_epoch: {}, instance_id: {}, opts: {:?}",
173 next_epoch, instance_id, opts
174 )
175 }
176
177 HummockEvent::RegisterReadVersion {
178 table_id,
179 new_read_version_sender: _,
180 is_replicated,
181 vnodes: _,
182 } => format!(
183 "RegisterReadVersion table_id {:?}, is_replicated: {:?}",
184 table_id, is_replicated
185 ),
186
187 HummockEvent::DestroyReadVersion { instance_id } => {
188 format!("DestroyReadVersion instance_id {:?}", instance_id)
189 }
190
191 #[cfg(any(test, feature = "test"))]
192 HummockEvent::FlushEvent(_) => "FlushEvent".to_owned(),
193 HummockEvent::GetMinUncommittedObjectId { .. } => {
194 "GetMinUncommittedObjectId".to_owned()
195 }
196 HummockEvent::RegisterVectorWriter { .. } => "RegisterVectorWriter".to_owned(),
197 HummockEvent::VectorWriterSealEpoch { .. } => "VectorWriterSealEpoch".to_owned(),
198 HummockEvent::DropVectorWriter { .. } => "DropVectorWriter".to_owned(),
199 }
200 }
201
202 pub fn event_name(&self) -> &'static str {
203 match self {
204 HummockEvent::BufferMayFlush => "BufferMayFlush",
205 HummockEvent::SyncEpoch { .. } => "SyncEpoch",
206 HummockEvent::Clear(..) => "Clear",
207 HummockEvent::Shutdown => "Shutdown",
208 HummockEvent::ImmToUploader { .. } => "ImmToUploader",
209 HummockEvent::StartEpoch { .. } => "StartEpoch",
210 HummockEvent::InitEpoch { .. } => "InitEpoch",
211 HummockEvent::LocalSealEpoch { .. } => "LocalSealEpoch",
212 #[cfg(any(test, feature = "test"))]
213 HummockEvent::FlushEvent(_) => "FlushEvent",
214 HummockEvent::RegisterReadVersion { .. } => "RegisterReadVersion",
215 HummockEvent::DestroyReadVersion { .. } => "DestroyReadVersion",
216 HummockEvent::RegisterVectorWriter { .. } => "RegisterVectorWriter",
217 HummockEvent::VectorWriterSealEpoch { .. } => "VectorWriterSealEpoch",
218 HummockEvent::DropVectorWriter { .. } => "DropVectorWriter",
219 HummockEvent::GetMinUncommittedObjectId { .. } => "GetMinUncommittedObjectId",
220 }
221 }
222}
223
224impl std::fmt::Debug for HummockEvent {
225 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226 f.debug_struct("HummockEvent")
227 .field("debug_string", &self.to_debug_string())
228 .finish()
229 }
230}
231
232pub type LocalInstanceId = u64;
233pub const TEST_LOCAL_INSTANCE_ID: LocalInstanceId = 233;
234pub type HummockReadVersionRef = Arc<RwLock<HummockReadVersion>>;
235pub type ReadVersionMappingType = HashMap<TableId, HashMap<LocalInstanceId, HummockReadVersionRef>>;
236pub type ReadOnlyReadVersionMapping = ReadOnlyRwLockRef<ReadVersionMappingType>;
237
238pub struct ReadOnlyRwLockRef<T>(Arc<RwLock<T>>);
239
240impl<T> Clone for ReadOnlyRwLockRef<T> {
241 fn clone(&self) -> Self {
242 Self(self.0.clone())
243 }
244}
245
246impl<T> ReadOnlyRwLockRef<T> {
247 pub fn new(inner: Arc<RwLock<T>>) -> Self {
248 Self(inner)
249 }
250
251 pub fn read(&self) -> RwLockReadGuard<'_, T> {
252 self.0.read()
253 }
254}
255
256pub struct LocalInstanceGuard {
257 pub table_id: TableId,
258 pub instance_id: LocalInstanceId,
259 event_sender: Option<HummockEventSender>,
261}
262
263impl Drop for LocalInstanceGuard {
264 fn drop(&mut self) {
265 if let Some(sender) = self.event_sender.take() {
266 sender
269 .send(HummockEvent::DestroyReadVersion {
270 instance_id: self.instance_id,
271 })
272 .unwrap_or_else(|err| {
273 tracing::debug!(
274 error = %err.as_report(),
275 table_id = %self.table_id,
276 instance_id = self.instance_id,
277 "LocalInstanceGuard Drop SendError",
278 )
279 })
280 }
281 }
282}