risingwave_frontend/scheduler/
snapshot.rs1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH};
21use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
22use risingwave_hummock_sdk::{
23 FrontendHummockVersion, FrontendHummockVersionDelta, HummockVersionId, INVALID_VERSION_ID,
24};
25use risingwave_pb::batch_plan::plan_node::NodeBody;
26use risingwave_pb::common::{BatchQueryCommittedEpoch, BatchQueryEpoch, batch_query_epoch};
27use risingwave_pb::hummock::{HummockVersionDeltas, StateTableInfoDelta};
28use tokio::sync::watch;
29
30use crate::error::{ErrorCode, RwError};
31use crate::expr::InlineNowProcTime;
32use crate::meta_client::FrontendMetaClient;
33use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query};
34use crate::scheduler::{SchedulerError, SchedulerResult};
35
36#[derive(Clone)]
38pub enum ReadSnapshot {
39 FrontendPinned {
41 snapshot: PinnedSnapshotRef,
42 },
43
44 ReadUncommitted,
45
46 Other(Epoch),
50}
51
52impl ReadSnapshot {
53 fn batch_query_epoch(
55 &self,
56 read_storage_tables: &HashSet<TableId>,
57 ) -> Result<BatchQueryEpoch, SchedulerError> {
58 Ok(match self {
59 ReadSnapshot::FrontendPinned { snapshot } => BatchQueryEpoch {
60 epoch: Some(batch_query_epoch::Epoch::Committed(
61 BatchQueryCommittedEpoch {
62 epoch: snapshot.batch_query_epoch(read_storage_tables)?.0,
63 hummock_version_id: snapshot.value.id.to_u64(),
64 },
65 )),
66 },
67 ReadSnapshot::ReadUncommitted => BatchQueryEpoch {
68 epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)),
69 },
70 ReadSnapshot::Other(e) => BatchQueryEpoch {
71 epoch: Some(batch_query_epoch::Epoch::Backup(e.0)),
72 },
73 })
74 }
75
76 pub fn fill_batch_query_epoch(&self, query: &mut Query) -> SchedulerResult<()> {
77 fn on_node(
78 execution_plan_node: &mut ExecutionPlanNode,
79 f: &mut impl FnMut(&mut Option<BatchQueryEpoch>, u32),
80 ) {
81 for node in &mut execution_plan_node.children {
82 on_node(node, f);
83 }
84 let (query_epoch, table_id) = match &mut execution_plan_node.node {
85 NodeBody::RowSeqScan(scan) => (
86 &mut scan.query_epoch,
87 scan.table_desc.as_ref().unwrap().table_id,
88 ),
89 NodeBody::DistributedLookupJoin(join) => (
90 &mut join.query_epoch,
91 join.inner_side_table_desc.as_ref().unwrap().table_id,
92 ),
93 NodeBody::LocalLookupJoin(join) => (
94 &mut join.query_epoch,
95 join.inner_side_table_desc.as_ref().unwrap().table_id,
96 ),
97 NodeBody::VectorIndexNearest(vector_index_read) => (
98 &mut vector_index_read.query_epoch,
99 vector_index_read.reader_desc.as_ref().unwrap().table_id,
100 ),
101 _ => {
102 return;
103 }
104 };
105 f(query_epoch, table_id);
106 }
107
108 fn on_query(query: &mut Query, mut f: impl FnMut(&mut Option<BatchQueryEpoch>, u32)) {
109 for stage in query.stage_graph.stages.values_mut() {
110 on_node(&mut stage.root, &mut f);
111 }
112 }
113
114 let mut unspecified_epoch_table_ids = HashSet::new();
115 on_query(query, |query_epoch, table_id| {
116 if query_epoch.is_none() {
117 unspecified_epoch_table_ids.insert(table_id.into());
118 }
119 });
120
121 let query_epoch = self.batch_query_epoch(&unspecified_epoch_table_ids)?;
122
123 on_query(query, |node_query_epoch, _| {
124 if node_query_epoch.is_none() {
125 *node_query_epoch = Some(query_epoch);
126 }
127 });
128
129 Ok(())
130 }
131
132 pub fn inline_now_proc_time(&self) -> InlineNowProcTime {
133 let epoch = match self {
134 ReadSnapshot::FrontendPinned { snapshot } => snapshot
135 .value
136 .state_table_info
137 .max_table_committed_epoch()
138 .map(Epoch)
139 .unwrap_or_else(Epoch::now),
140 ReadSnapshot::ReadUncommitted => Epoch::now(),
141 ReadSnapshot::Other(epoch) => *epoch,
142 };
143 InlineNowProcTime::new(epoch)
144 }
145
146 pub fn support_barrier_read(&self) -> bool {
148 matches!(self, ReadSnapshot::ReadUncommitted)
149 }
150}
151
152pub struct PinnedSnapshot {
155 value: FrontendHummockVersion,
156}
157
158impl std::fmt::Debug for PinnedSnapshot {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 self.value.fmt(f)
161 }
162}
163
164pub type PinnedSnapshotRef = Arc<PinnedSnapshot>;
166
167impl PinnedSnapshot {
168 fn batch_query_epoch(
169 &self,
170 read_storage_tables: &HashSet<TableId>,
171 ) -> Result<Epoch, SchedulerError> {
172 let epoch = read_storage_tables
174 .iter()
175 .map(|table_id| {
176 self.value
177 .state_table_info
178 .info()
179 .get(table_id)
180 .map(|info| Epoch(info.committed_epoch))
181 .ok_or_else(|| anyhow!("table id {table_id} may have been dropped"))
182 })
183 .try_fold(None, |prev_min_committed_epoch, committed_epoch| {
184 committed_epoch.map(|committed_epoch| {
185 if let Some(prev_min_committed_epoch) = prev_min_committed_epoch
186 && prev_min_committed_epoch <= committed_epoch
187 {
188 Some(prev_min_committed_epoch)
189 } else {
190 Some(committed_epoch)
191 }
192 })
193 })?
194 .unwrap_or_else(Epoch::now);
195 Ok(epoch)
196 }
197
198 pub fn version(&self) -> &FrontendHummockVersion {
199 &self.value
200 }
201
202 pub fn list_change_log_epochs(
203 &self,
204 table_id: TableId,
205 min_epoch: u64,
206 max_count: u32,
207 ) -> Vec<u64> {
208 if let Some(table_change_log) = self.value.table_change_log.get(&table_id) {
209 let table_change_log = table_change_log.clone();
210 table_change_log.get_non_empty_epochs(min_epoch, max_count as usize)
211 } else {
212 vec![]
213 }
214 }
215}
216
217fn invalid_snapshot() -> FrontendHummockVersion {
219 FrontendHummockVersion {
220 id: INVALID_VERSION_ID,
221 state_table_info: HummockVersionStateTableInfo::from_protobuf(&HashMap::new()),
222 table_change_log: Default::default(),
223 }
224}
225
226pub struct HummockSnapshotManager {
228 latest_snapshot: watch::Sender<PinnedSnapshotRef>,
238
239 table_change_log_notification_sender: watch::Sender<TableChangeLogNotificationMsg>,
240}
241
242#[derive(Default)]
243struct TableChangeLogNotificationMsg {
244 updated_change_log_table_ids: HashSet<TableId>,
245 deleted_table_ids: HashSet<TableId>,
246}
247
248pub type HummockSnapshotManagerRef = Arc<HummockSnapshotManager>;
249
250impl HummockSnapshotManager {
251 pub fn new(_meta_client: Arc<dyn FrontendMetaClient>) -> Self {
252 let latest_snapshot = Arc::new(PinnedSnapshot {
253 value: invalid_snapshot(),
254 });
255
256 let (latest_snapshot, _) = watch::channel(latest_snapshot);
257
258 let (table_change_log_notification_sender, _) =
259 watch::channel(TableChangeLogNotificationMsg::default());
260
261 Self {
262 latest_snapshot,
263 table_change_log_notification_sender,
264 }
265 }
266
267 pub fn acquire(&self) -> PinnedSnapshotRef {
269 self.latest_snapshot.borrow().clone()
270 }
271
272 pub fn init(&self, version: FrontendHummockVersion) {
273 let updated_change_log_table_ids: HashSet<_> = version
274 .table_change_log
275 .iter()
276 .filter_map(|(table_id, change_log)| {
277 if change_log.get_non_empty_epochs(0, usize::MAX).is_empty() {
278 None
279 } else {
280 Some(*table_id)
281 }
282 })
283 .collect();
284 self.table_change_log_notification_sender
285 .send(TableChangeLogNotificationMsg {
286 updated_change_log_table_ids,
287 deleted_table_ids: Default::default(),
288 })
289 .ok();
290
291 self.update_inner(|_| Some(version));
292 }
293
294 pub fn update(&self, deltas: HummockVersionDeltas) {
298 let updated_change_log_table_ids: HashSet<_> = deltas
299 .version_deltas
300 .iter()
301 .flat_map(|version_deltas| &version_deltas.change_log_delta)
302 .filter_map(|(table_id, change_log)| match change_log.new_log.as_ref() {
303 Some(new_log) => {
304 let new_value_empty = new_log.new_value.is_empty();
305 let old_value_empty = new_log.old_value.is_empty();
306 if !new_value_empty || !old_value_empty {
307 Some(TableId::new(*table_id))
308 } else {
309 None
310 }
311 }
312 None => None,
313 })
314 .collect();
315 let deleted_table_ids: HashSet<_> = deltas
316 .version_deltas
317 .iter()
318 .flat_map(|version_deltas| {
319 version_deltas
320 .removed_table_ids
321 .iter()
322 .map(|table_id| table_id.into())
323 })
324 .collect();
325 self.table_change_log_notification_sender
326 .send(TableChangeLogNotificationMsg {
327 updated_change_log_table_ids,
328 deleted_table_ids,
329 })
330 .ok();
331
332 self.update_inner(|old_snapshot| {
333 if deltas.version_deltas.is_empty() {
334 return None;
335 }
336 let mut snapshot = old_snapshot.clone();
337 for delta in deltas.version_deltas {
338 snapshot.apply_delta(FrontendHummockVersionDelta::from_protobuf(delta));
339 }
340 Some(snapshot)
341 })
342 }
343
344 pub fn add_table_for_test(&self, table_id: TableId) {
345 self.update_inner(|version| {
346 let mut version = version.clone();
347 version.id = version.id.next();
348 version.state_table_info.apply_delta(
349 &HashMap::from_iter([(
350 table_id,
351 StateTableInfoDelta {
352 committed_epoch: INVALID_EPOCH,
353 compaction_group_id: 0,
354 },
355 )]),
356 &HashSet::new(),
357 );
358 Some(version)
359 });
360 }
361
362 fn update_inner(
363 &self,
364 get_new_snapshot: impl FnOnce(&FrontendHummockVersion) -> Option<FrontendHummockVersion>,
365 ) {
366 self.latest_snapshot.send_if_modified(move |old_snapshot| {
367 let new_snapshot = get_new_snapshot(&old_snapshot.value);
368 let Some(snapshot) = new_snapshot else {
369 return false;
370 };
371 if snapshot.id <= old_snapshot.value.id {
372 assert_eq!(
373 snapshot.id, old_snapshot.value.id,
374 "receive stale frontend version"
375 );
376 return false;
377 }
378 *old_snapshot = Arc::new(PinnedSnapshot { value: snapshot });
379
380 true
381 });
382 }
383
384 pub async fn wait(&self, version_id: HummockVersionId) {
386 let mut rx = self.latest_snapshot.subscribe();
387 while rx.borrow_and_update().value.id < version_id {
388 rx.changed().await.unwrap();
389 }
390 }
391
392 pub async fn wait_table_change_log_notification(
393 &self,
394 table_id: TableId,
395 ) -> Result<(), RwError> {
396 let mut rx = self.table_change_log_notification_sender.subscribe();
397 loop {
398 rx.changed()
399 .await
400 .map_err(|_| ErrorCode::InternalError("cursor notify channel is closed.".into()))?;
401 let table_change_log_notification_msg = rx.borrow_and_update();
402 if table_change_log_notification_msg
403 .deleted_table_ids
404 .contains(&table_id)
405 {
406 return Err(ErrorCode::InternalError(format!(
407 "Cursor dependent table deleted: table_id is {:?}",
408 table_id
409 ))
410 .into());
411 }
412 if table_change_log_notification_msg
413 .updated_change_log_table_ids
414 .contains(&table_id)
415 {
416 break;
417 }
418 }
419 Ok(())
420 }
421}