risingwave_frontend/scheduler/
snapshot.rs
1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH};
21use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
22use risingwave_hummock_sdk::{
23 FrontendHummockVersion, FrontendHummockVersionDelta, HummockVersionId, INVALID_VERSION_ID,
24};
25use risingwave_pb::common::{BatchQueryCommittedEpoch, BatchQueryEpoch, batch_query_epoch};
26use risingwave_pb::hummock::{HummockVersionDeltas, StateTableInfoDelta};
27use tokio::sync::watch;
28
29use crate::error::{ErrorCode, RwError};
30use crate::expr::InlineNowProcTime;
31use crate::meta_client::FrontendMetaClient;
32use crate::scheduler::SchedulerError;
33
34#[derive(Clone)]
36pub enum ReadSnapshot {
37 FrontendPinned {
39 snapshot: PinnedSnapshotRef,
40 },
41
42 ReadUncommitted,
43
44 Other(Epoch),
48}
49
50impl ReadSnapshot {
51 pub fn batch_query_epoch(
53 &self,
54 read_storage_tables: &HashSet<TableId>,
55 ) -> Result<BatchQueryEpoch, SchedulerError> {
56 Ok(match self {
57 ReadSnapshot::FrontendPinned { snapshot } => BatchQueryEpoch {
58 epoch: Some(batch_query_epoch::Epoch::Committed(
59 BatchQueryCommittedEpoch {
60 epoch: snapshot.batch_query_epoch(read_storage_tables)?.0,
61 hummock_version_id: snapshot.value.id.to_u64(),
62 },
63 )),
64 },
65 ReadSnapshot::ReadUncommitted => BatchQueryEpoch {
66 epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)),
67 },
68 ReadSnapshot::Other(e) => BatchQueryEpoch {
69 epoch: Some(batch_query_epoch::Epoch::Backup(e.0)),
70 },
71 })
72 }
73
74 pub fn inline_now_proc_time(&self) -> InlineNowProcTime {
75 let epoch = match self {
76 ReadSnapshot::FrontendPinned { snapshot } => snapshot
77 .value
78 .state_table_info
79 .max_table_committed_epoch()
80 .map(Epoch)
81 .unwrap_or_else(Epoch::now),
82 ReadSnapshot::ReadUncommitted => Epoch::now(),
83 ReadSnapshot::Other(epoch) => *epoch,
84 };
85 InlineNowProcTime::new(epoch)
86 }
87
88 pub fn support_barrier_read(&self) -> bool {
90 matches!(self, ReadSnapshot::ReadUncommitted)
91 }
92}
93
94pub struct PinnedSnapshot {
97 value: FrontendHummockVersion,
98}
99
100impl std::fmt::Debug for PinnedSnapshot {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 self.value.fmt(f)
103 }
104}
105
106pub type PinnedSnapshotRef = Arc<PinnedSnapshot>;
108
109impl PinnedSnapshot {
110 fn batch_query_epoch(
111 &self,
112 read_storage_tables: &HashSet<TableId>,
113 ) -> Result<Epoch, SchedulerError> {
114 let epoch = read_storage_tables
116 .iter()
117 .map(|table_id| {
118 self.value
119 .state_table_info
120 .info()
121 .get(table_id)
122 .map(|info| Epoch(info.committed_epoch))
123 .ok_or_else(|| anyhow!("table id {table_id} may have been dropped"))
124 })
125 .try_fold(None, |prev_min_committed_epoch, committed_epoch| {
126 committed_epoch.map(|committed_epoch| {
127 if let Some(prev_min_committed_epoch) = prev_min_committed_epoch
128 && prev_min_committed_epoch <= committed_epoch
129 {
130 Some(prev_min_committed_epoch)
131 } else {
132 Some(committed_epoch)
133 }
134 })
135 })?
136 .unwrap_or_else(Epoch::now);
137 Ok(epoch)
138 }
139
140 pub fn version(&self) -> &FrontendHummockVersion {
141 &self.value
142 }
143
144 pub fn list_change_log_epochs(
145 &self,
146 table_id: u32,
147 min_epoch: u64,
148 max_count: u32,
149 ) -> Vec<u64> {
150 if let Some(table_change_log) = self.value.table_change_log.get(&TableId::new(table_id)) {
151 let table_change_log = table_change_log.clone();
152 table_change_log.get_non_empty_epochs(min_epoch, max_count as usize)
153 } else {
154 vec![]
155 }
156 }
157}
158
159fn invalid_snapshot() -> FrontendHummockVersion {
161 FrontendHummockVersion {
162 id: INVALID_VERSION_ID,
163 state_table_info: HummockVersionStateTableInfo::from_protobuf(&HashMap::new()),
164 table_change_log: Default::default(),
165 }
166}
167
168pub struct HummockSnapshotManager {
170 latest_snapshot: watch::Sender<PinnedSnapshotRef>,
180
181 table_change_log_notification_sender: watch::Sender<TableChangeLogNotificationMsg>,
182}
183
184#[derive(Default)]
185struct TableChangeLogNotificationMsg {
186 updated_change_log_table_ids: HashSet<u32>,
187 deleted_table_ids: HashSet<u32>,
188}
189
190pub type HummockSnapshotManagerRef = Arc<HummockSnapshotManager>;
191
192impl HummockSnapshotManager {
193 pub fn new(_meta_client: Arc<dyn FrontendMetaClient>) -> Self {
194 let latest_snapshot = Arc::new(PinnedSnapshot {
195 value: invalid_snapshot(),
196 });
197
198 let (latest_snapshot, _) = watch::channel(latest_snapshot);
199
200 let (table_change_log_notification_sender, _) =
201 watch::channel(TableChangeLogNotificationMsg::default());
202
203 Self {
204 latest_snapshot,
205 table_change_log_notification_sender,
206 }
207 }
208
209 pub fn acquire(&self) -> PinnedSnapshotRef {
211 self.latest_snapshot.borrow().clone()
212 }
213
214 pub fn init(&self, version: FrontendHummockVersion) {
215 let updated_change_log_table_ids: HashSet<_> = version
216 .table_change_log
217 .iter()
218 .filter_map(|(table_id, change_log)| {
219 if change_log.get_non_empty_epochs(0, usize::MAX).is_empty() {
220 None
221 } else {
222 Some(table_id.table_id())
223 }
224 })
225 .collect();
226 self.table_change_log_notification_sender
227 .send(TableChangeLogNotificationMsg {
228 updated_change_log_table_ids,
229 deleted_table_ids: Default::default(),
230 })
231 .ok();
232
233 self.update_inner(|_| Some(version));
234 }
235
236 pub fn update(&self, deltas: HummockVersionDeltas) {
240 let updated_change_log_table_ids: HashSet<_> = deltas
241 .version_deltas
242 .iter()
243 .flat_map(|version_deltas| &version_deltas.change_log_delta)
244 .filter_map(|(table_id, change_log)| match change_log.new_log.as_ref() {
245 Some(new_log) => {
246 let new_value_empty = new_log.new_value.is_empty();
247 let old_value_empty = new_log.old_value.is_empty();
248 if !new_value_empty || !old_value_empty {
249 Some(*table_id)
250 } else {
251 None
252 }
253 }
254 None => None,
255 })
256 .collect();
257 let deleted_table_ids: HashSet<_> = deltas
258 .version_deltas
259 .iter()
260 .flat_map(|version_deltas| version_deltas.removed_table_ids.clone())
261 .collect();
262 self.table_change_log_notification_sender
263 .send(TableChangeLogNotificationMsg {
264 updated_change_log_table_ids,
265 deleted_table_ids,
266 })
267 .ok();
268
269 self.update_inner(|old_snapshot| {
270 if deltas.version_deltas.is_empty() {
271 return None;
272 }
273 let mut snapshot = old_snapshot.clone();
274 for delta in deltas.version_deltas {
275 snapshot.apply_delta(FrontendHummockVersionDelta::from_protobuf(delta));
276 }
277 Some(snapshot)
278 })
279 }
280
281 pub fn add_table_for_test(&self, table_id: TableId) {
282 self.update_inner(|version| {
283 let mut version = version.clone();
284 version.id = version.id.next();
285 version.state_table_info.apply_delta(
286 &HashMap::from_iter([(
287 table_id,
288 StateTableInfoDelta {
289 committed_epoch: INVALID_EPOCH,
290 compaction_group_id: 0,
291 },
292 )]),
293 &HashSet::new(),
294 );
295 Some(version)
296 });
297 }
298
299 fn update_inner(
300 &self,
301 get_new_snapshot: impl FnOnce(&FrontendHummockVersion) -> Option<FrontendHummockVersion>,
302 ) {
303 self.latest_snapshot.send_if_modified(move |old_snapshot| {
304 let new_snapshot = get_new_snapshot(&old_snapshot.value);
305 let Some(snapshot) = new_snapshot else {
306 return false;
307 };
308 if snapshot.id <= old_snapshot.value.id {
309 assert_eq!(
310 snapshot.id, old_snapshot.value.id,
311 "receive stale frontend version"
312 );
313 return false;
314 }
315 *old_snapshot = Arc::new(PinnedSnapshot { value: snapshot });
316
317 true
318 });
319 }
320
321 pub async fn wait(&self, version_id: HummockVersionId) {
323 let mut rx = self.latest_snapshot.subscribe();
324 while rx.borrow_and_update().value.id < version_id {
325 rx.changed().await.unwrap();
326 }
327 }
328
329 pub async fn wait_table_change_log_notification(&self, table_id: u32) -> Result<(), RwError> {
330 let mut rx = self.table_change_log_notification_sender.subscribe();
331 loop {
332 rx.changed()
333 .await
334 .map_err(|_| ErrorCode::InternalError("cursor notify channel is closed.".into()))?;
335 let table_change_log_notification_msg = rx.borrow_and_update();
336 if table_change_log_notification_msg
337 .deleted_table_ids
338 .contains(&table_id)
339 {
340 return Err(ErrorCode::InternalError(format!(
341 "Cursor dependent table deleted: table_id is {:?}",
342 table_id
343 ))
344 .into());
345 }
346 if table_change_log_notification_msg
347 .updated_change_log_table_ids
348 .contains(&table_id)
349 {
350 break;
351 }
352 }
353 Ok(())
354 }
355}