risingwave_storage/hummock/event_handler/
mod.rs1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use parking_lot::{RwLock, RwLockReadGuard};
19use risingwave_common::bitmap::Bitmap;
20use risingwave_common::catalog::TableId;
21use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId};
22use thiserror_ext::AsReport;
23use tokio::sync::oneshot;
24
25use crate::hummock::HummockResult;
26use crate::hummock::shared_buffer::shared_buffer_batch::{SharedBufferBatch, SharedBufferBatchId};
27use crate::mem_table::ImmutableMemtable;
28use crate::store::SealCurrentEpochOptions;
29
30pub mod hummock_event_handler;
31pub mod refiller;
32pub mod uploader;
33
34pub use hummock_event_handler::HummockEventHandler;
35use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
36
37use super::store::version::HummockReadVersion;
38use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
39use crate::hummock::event_handler::uploader::SyncedData;
40
41#[derive(Debug)]
42pub struct BufferWriteRequest {
43 pub batch: SharedBufferBatch,
44 pub epoch: HummockEpoch,
45 pub grant_sender: oneshot::Sender<()>,
46}
47
48#[derive(Debug)]
49pub enum HummockVersionUpdate {
50 VersionDeltas(Vec<HummockVersionDelta>),
51 PinnedVersion(Box<HummockVersion>),
52}
53
54pub enum HummockEvent {
55 BufferMayFlush,
57
58 SyncEpoch {
62 sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
63 sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
64 },
65
66 Clear(oneshot::Sender<()>, Option<HashSet<TableId>>),
68
69 Shutdown,
70
71 ImmToUploader {
72 instance_id: SharedBufferBatchId,
73 imm: ImmutableMemtable,
74 },
75
76 StartEpoch {
77 epoch: HummockEpoch,
78 table_ids: HashSet<TableId>,
79 },
80
81 InitEpoch {
82 instance_id: LocalInstanceId,
83 init_epoch: HummockEpoch,
84 },
85
86 LocalSealEpoch {
87 instance_id: LocalInstanceId,
88 next_epoch: HummockEpoch,
89 opts: SealCurrentEpochOptions,
90 },
91
92 #[cfg(any(test, feature = "test"))]
93 FlushEvent(oneshot::Sender<()>),
96
97 RegisterReadVersion {
98 table_id: TableId,
99 new_read_version_sender: oneshot::Sender<(HummockReadVersionRef, LocalInstanceGuard)>,
100 is_replicated: bool,
101 vnodes: Arc<Bitmap>,
102 },
103
104 DestroyReadVersion {
105 instance_id: LocalInstanceId,
106 },
107
108 GetMinUncommittedSstId {
109 result_tx: oneshot::Sender<Option<HummockSstableObjectId>>,
110 },
111}
112
113impl HummockEvent {
114 fn to_debug_string(&self) -> String {
115 match self {
116 HummockEvent::BufferMayFlush => "BufferMayFlush".to_owned(),
117
118 HummockEvent::SyncEpoch {
119 sync_result_sender: _,
120 sync_table_epochs,
121 } => format!("AwaitSyncEpoch epoch {:?}", sync_table_epochs),
122
123 HummockEvent::Clear(_, table_ids) => {
124 format!("Clear {:?}", table_ids)
125 }
126
127 HummockEvent::Shutdown => "Shutdown".to_owned(),
128
129 HummockEvent::StartEpoch { epoch, table_ids } => {
130 format!("StartEpoch {} {:?}", epoch, table_ids)
131 }
132
133 HummockEvent::InitEpoch {
134 instance_id,
135 init_epoch,
136 } => {
137 format!("InitEpoch {} {}", instance_id, init_epoch)
138 }
139
140 HummockEvent::ImmToUploader { instance_id, imm } => {
141 format!("ImmToUploader {} {}", instance_id, imm.batch_id())
142 }
143
144 HummockEvent::LocalSealEpoch {
145 instance_id,
146 next_epoch,
147 opts,
148 } => {
149 format!(
150 "LocalSealEpoch next_epoch: {}, instance_id: {}, opts: {:?}",
151 next_epoch, instance_id, opts
152 )
153 }
154
155 HummockEvent::RegisterReadVersion {
156 table_id,
157 new_read_version_sender: _,
158 is_replicated,
159 vnodes: _,
160 } => format!(
161 "RegisterReadVersion table_id {:?}, is_replicated: {:?}",
162 table_id, is_replicated
163 ),
164
165 HummockEvent::DestroyReadVersion { instance_id } => {
166 format!("DestroyReadVersion instance_id {:?}", instance_id)
167 }
168
169 #[cfg(any(test, feature = "test"))]
170 HummockEvent::FlushEvent(_) => "FlushEvent".to_owned(),
171 HummockEvent::GetMinUncommittedSstId { .. } => "GetMinSpilledSstId".to_owned(),
172 }
173 }
174}
175
176impl std::fmt::Debug for HummockEvent {
177 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178 f.debug_struct("HummockEvent")
179 .field("debug_string", &self.to_debug_string())
180 .finish()
181 }
182}
183
184pub type LocalInstanceId = u64;
185pub const TEST_LOCAL_INSTANCE_ID: LocalInstanceId = 233;
186pub type HummockReadVersionRef = Arc<RwLock<HummockReadVersion>>;
187pub type ReadVersionMappingType = HashMap<TableId, HashMap<LocalInstanceId, HummockReadVersionRef>>;
188pub type ReadOnlyReadVersionMapping = ReadOnlyRwLockRef<ReadVersionMappingType>;
189
190pub struct ReadOnlyRwLockRef<T>(Arc<RwLock<T>>);
191
192impl<T> Clone for ReadOnlyRwLockRef<T> {
193 fn clone(&self) -> Self {
194 Self(self.0.clone())
195 }
196}
197
198impl<T> ReadOnlyRwLockRef<T> {
199 pub fn new(inner: Arc<RwLock<T>>) -> Self {
200 Self(inner)
201 }
202
203 pub fn read(&self) -> RwLockReadGuard<'_, T> {
204 self.0.read()
205 }
206}
207
208pub struct LocalInstanceGuard {
209 pub table_id: TableId,
210 pub instance_id: LocalInstanceId,
211 event_sender: Option<HummockEventSender>,
213}
214
215impl Drop for LocalInstanceGuard {
216 fn drop(&mut self) {
217 if let Some(sender) = self.event_sender.take() {
218 sender
221 .send(HummockEvent::DestroyReadVersion {
222 instance_id: self.instance_id,
223 })
224 .unwrap_or_else(|err| {
225 tracing::debug!(
226 error = %err.as_report(),
227 table_id = %self.table_id,
228 instance_id = self.instance_id,
229 "LocalInstanceGuard Drop SendError",
230 )
231 })
232 }
233 }
234}