risingwave_storage/hummock/event_handler/
mod.rs1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use parking_lot::{RwLock, RwLockReadGuard};
20use risingwave_common::bitmap::Bitmap;
21use risingwave_common::catalog::TableId;
22use risingwave_hummock_sdk::{HummockEpoch, HummockRawObjectId};
23use thiserror_ext::AsReport;
24use tokio::sync::oneshot;
25
26use crate::hummock::HummockResult;
27use crate::hummock::shared_buffer::shared_buffer_batch::{SharedBufferBatch, SharedBufferBatchId};
28use crate::mem_table::ImmutableMemtable;
29use crate::store::SealCurrentEpochOptions;
30
31pub mod hummock_event_handler;
32pub mod refiller;
33pub mod uploader;
34
35pub use hummock_event_handler::HummockEventHandler;
36use risingwave_hummock_sdk::vector_index::VectorIndexAdd;
37use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
38
39use super::store::version::HummockReadVersion;
40use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
41use crate::hummock::event_handler::uploader::SyncedData;
42
43#[derive(Debug)]
44pub struct BufferWriteRequest {
45 pub batch: SharedBufferBatch,
46 pub epoch: HummockEpoch,
47 pub grant_sender: oneshot::Sender<()>,
48}
49
50#[derive(Debug)]
51pub enum HummockVersionUpdate {
52 VersionDeltas(Vec<HummockVersionDelta>),
53 PinnedVersion(Box<HummockVersion>),
54}
55
56pub enum HummockEvent {
57 BufferMayFlush,
59
60 SyncEpoch {
64 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
65 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
66 },
67
68 Clear(oneshot::Sender<()>, Option<HashSet<TableId>>),
70
71 Shutdown,
72
73 ImmToUploader {
74 instance_id: SharedBufferBatchId,
75 imms: Vec<ImmutableMemtable>,
76 },
77
78 StartEpoch {
79 epoch: HummockEpoch,
80 table_ids: HashSet<TableId>,
81 },
82
83 InitEpoch {
84 instance_id: LocalInstanceId,
85 init_epoch: HummockEpoch,
86 },
87
88 LocalSealEpoch {
89 instance_id: LocalInstanceId,
90 next_epoch: HummockEpoch,
91 opts: SealCurrentEpochOptions,
92 },
93
94 #[cfg(any(test, feature = "test"))]
95 FlushEvent(oneshot::Sender<()>),
98
99 RegisterReadVersion {
100 table_id: TableId,
101 new_read_version_sender: oneshot::Sender<(HummockReadVersionRef, LocalInstanceGuard)>,
102 is_replicated: bool,
103 vnodes: Arc<Bitmap>,
104 },
105
106 DestroyReadVersion {
107 instance_id: LocalInstanceId,
108 },
109
110 RegisterVectorWriter {
111 table_id: TableId,
112 init_epoch: HummockEpoch,
113 },
114
115 VectorWriterSealEpoch {
116 table_id: TableId,
117 next_epoch: HummockEpoch,
118 add: Option<VectorIndexAdd>,
119 },
120
121 DropVectorWriter {
122 table_id: TableId,
123 },
124
125 GetMinUncommittedObjectId {
126 result_tx: oneshot::Sender<Option<HummockRawObjectId>>,
127 },
128}
129
130impl HummockEvent {
131 fn to_debug_string(&self) -> String {
132 match self {
133 HummockEvent::BufferMayFlush => "BufferMayFlush".to_owned(),
134
135 HummockEvent::SyncEpoch {
136 sync_result_sender: _,
137 sync_table_epochs,
138 } => format!("AwaitSyncEpoch epoch {:?}", sync_table_epochs),
139
140 HummockEvent::Clear(_, table_ids) => {
141 format!("Clear {:?}", table_ids)
142 }
143
144 HummockEvent::Shutdown => "Shutdown".to_owned(),
145
146 HummockEvent::StartEpoch { epoch, table_ids } => {
147 format!("StartEpoch {} {:?}", epoch, table_ids)
148 }
149
150 HummockEvent::InitEpoch {
151 instance_id,
152 init_epoch,
153 } => {
154 format!("InitEpoch {} {}", instance_id, init_epoch)
155 }
156
157 HummockEvent::ImmToUploader { instance_id, imms } => {
158 format!(
159 "ImmToUploader {} {:?}",
160 instance_id,
161 imms.iter().map(|imm| imm.batch_id()).collect_vec()
162 )
163 }
164
165 HummockEvent::LocalSealEpoch {
166 instance_id,
167 next_epoch,
168 opts,
169 } => {
170 format!(
171 "LocalSealEpoch next_epoch: {}, instance_id: {}, opts: {:?}",
172 next_epoch, instance_id, opts
173 )
174 }
175
176 HummockEvent::RegisterReadVersion {
177 table_id,
178 new_read_version_sender: _,
179 is_replicated,
180 vnodes: _,
181 } => format!(
182 "RegisterReadVersion table_id {:?}, is_replicated: {:?}",
183 table_id, is_replicated
184 ),
185
186 HummockEvent::DestroyReadVersion { instance_id } => {
187 format!("DestroyReadVersion instance_id {:?}", instance_id)
188 }
189
190 #[cfg(any(test, feature = "test"))]
191 HummockEvent::FlushEvent(_) => "FlushEvent".to_owned(),
192 HummockEvent::GetMinUncommittedObjectId { .. } => {
193 "GetMinUncommittedObjectId".to_owned()
194 }
195 HummockEvent::RegisterVectorWriter { .. } => "RegisterVectorWriter".to_owned(),
196 HummockEvent::VectorWriterSealEpoch { .. } => "VectorWriterSealEpoch".to_owned(),
197 HummockEvent::DropVectorWriter { .. } => "DropVectorWriter".to_owned(),
198 }
199 }
200}
201
202impl std::fmt::Debug for HummockEvent {
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 f.debug_struct("HummockEvent")
205 .field("debug_string", &self.to_debug_string())
206 .finish()
207 }
208}
209
210pub type LocalInstanceId = u64;
211pub const TEST_LOCAL_INSTANCE_ID: LocalInstanceId = 233;
212pub type HummockReadVersionRef = Arc<RwLock<HummockReadVersion>>;
213pub type ReadVersionMappingType = HashMap<TableId, HashMap<LocalInstanceId, HummockReadVersionRef>>;
214pub type ReadOnlyReadVersionMapping = ReadOnlyRwLockRef<ReadVersionMappingType>;
215
216pub struct ReadOnlyRwLockRef<T>(Arc<RwLock<T>>);
217
218impl<T> Clone for ReadOnlyRwLockRef<T> {
219 fn clone(&self) -> Self {
220 Self(self.0.clone())
221 }
222}
223
224impl<T> ReadOnlyRwLockRef<T> {
225 pub fn new(inner: Arc<RwLock<T>>) -> Self {
226 Self(inner)
227 }
228
229 pub fn read(&self) -> RwLockReadGuard<'_, T> {
230 self.0.read()
231 }
232}
233
234pub struct LocalInstanceGuard {
235 pub table_id: TableId,
236 pub instance_id: LocalInstanceId,
237 event_sender: Option<HummockEventSender>,
239}
240
241impl Drop for LocalInstanceGuard {
242 fn drop(&mut self) {
243 if let Some(sender) = self.event_sender.take() {
244 sender
247 .send(HummockEvent::DestroyReadVersion {
248 instance_id: self.instance_id,
249 })
250 .unwrap_or_else(|err| {
251 tracing::debug!(
252 error = %err.as_report(),
253 table_id = %self.table_id,
254 instance_id = self.instance_id,
255 "LocalInstanceGuard Drop SendError",
256 )
257 })
258 }
259 }
260}