risingwave_frontend/scheduler/
snapshot.rs1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH};
21use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
22use risingwave_hummock_sdk::{
23 FrontendHummockVersion, FrontendHummockVersionDelta, HummockVersionId, INVALID_VERSION_ID,
24};
25use risingwave_pb::batch_plan::plan_node::NodeBody;
26use risingwave_pb::common::{BatchQueryCommittedEpoch, BatchQueryEpoch, batch_query_epoch};
27use risingwave_pb::hummock::{HummockVersionDeltas, StateTableInfoDelta};
28use tokio::sync::watch;
29
30use crate::error::{ErrorCode, RwError};
31use crate::expr::InlineNowProcTime;
32use crate::meta_client::FrontendMetaClient;
33use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query};
34use crate::scheduler::{SchedulerError, SchedulerResult};
35
36#[derive(Clone)]
38pub enum ReadSnapshot {
39 FrontendPinned {
41 snapshot: PinnedSnapshotRef,
42 },
43
44 ReadUncommitted,
45
46 Other(Epoch),
50}
51
52impl ReadSnapshot {
53 fn batch_query_epoch(
55 &self,
56 read_storage_tables: &HashSet<TableId>,
57 ) -> Result<BatchQueryEpoch, SchedulerError> {
58 Ok(match self {
59 ReadSnapshot::FrontendPinned { snapshot } => BatchQueryEpoch {
60 epoch: Some(batch_query_epoch::Epoch::Committed(
61 BatchQueryCommittedEpoch {
62 epoch: snapshot.batch_query_epoch(read_storage_tables)?.0,
63 hummock_version_id: snapshot.value.id.to_u64(),
64 },
65 )),
66 },
67 ReadSnapshot::ReadUncommitted => BatchQueryEpoch {
68 epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)),
69 },
70 ReadSnapshot::Other(e) => BatchQueryEpoch {
71 epoch: Some(batch_query_epoch::Epoch::Backup(e.0)),
72 },
73 })
74 }
75
76 pub fn fill_batch_query_epoch(&self, query: &mut Query) -> SchedulerResult<()> {
77 fn on_node(
78 execution_plan_node: &mut ExecutionPlanNode,
79 f: &mut impl FnMut(&mut Option<BatchQueryEpoch>, TableId),
80 ) {
81 for node in &mut execution_plan_node.children {
82 on_node(node, f);
83 }
84 let (query_epoch, table_id) = match &mut execution_plan_node.node {
85 NodeBody::RowSeqScan(scan) => (
86 &mut scan.query_epoch,
87 scan.table_desc.as_ref().unwrap().table_id,
88 ),
89 NodeBody::DistributedLookupJoin(join) => (
90 &mut join.query_epoch,
91 join.inner_side_table_desc.as_ref().unwrap().table_id,
92 ),
93 NodeBody::LocalLookupJoin(join) => (
94 &mut join.query_epoch,
95 join.inner_side_table_desc.as_ref().unwrap().table_id,
96 ),
97 NodeBody::VectorIndexNearest(vector_index_read) => (
98 &mut vector_index_read.query_epoch,
99 vector_index_read.reader_desc.as_ref().unwrap().table_id,
100 ),
101 _ => {
102 return;
103 }
104 };
105 f(query_epoch, table_id);
106 }
107
108 fn on_query(query: &mut Query, mut f: impl FnMut(&mut Option<BatchQueryEpoch>, TableId)) {
109 for stage in query.stage_graph.stages.values_mut() {
110 on_node(&mut stage.root, &mut f);
111 }
112 }
113
114 let mut unspecified_epoch_table_ids = HashSet::new();
115 on_query(query, |query_epoch, table_id| {
116 if query_epoch.is_none() {
117 unspecified_epoch_table_ids.insert(table_id);
118 }
119 });
120
121 let query_epoch = self.batch_query_epoch(&unspecified_epoch_table_ids)?;
122
123 on_query(query, |node_query_epoch, _| {
124 if node_query_epoch.is_none() {
125 *node_query_epoch = Some(query_epoch);
126 }
127 });
128
129 Ok(())
130 }
131
132 pub fn inline_now_proc_time(&self) -> InlineNowProcTime {
133 let epoch = match self {
134 ReadSnapshot::FrontendPinned { snapshot } => snapshot
135 .value
136 .state_table_info
137 .max_table_committed_epoch()
138 .map(Epoch)
139 .unwrap_or_else(Epoch::now),
140 ReadSnapshot::ReadUncommitted => Epoch::now(),
141 ReadSnapshot::Other(epoch) => *epoch,
142 };
143 InlineNowProcTime::new(epoch)
144 }
145
146 pub fn support_barrier_read(&self) -> bool {
148 matches!(self, ReadSnapshot::ReadUncommitted)
149 }
150}
151
152pub struct PinnedSnapshot {
155 value: FrontendHummockVersion,
156}
157
158impl std::fmt::Debug for PinnedSnapshot {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 self.value.fmt(f)
161 }
162}
163
164pub type PinnedSnapshotRef = Arc<PinnedSnapshot>;
166
167impl PinnedSnapshot {
168 fn batch_query_epoch(
169 &self,
170 read_storage_tables: &HashSet<TableId>,
171 ) -> Result<Epoch, SchedulerError> {
172 let epoch = read_storage_tables
174 .iter()
175 .map(|table_id| {
176 self.value
177 .state_table_info
178 .info()
179 .get(table_id)
180 .map(|info| Epoch(info.committed_epoch))
181 .ok_or_else(|| anyhow!("table id {table_id} may have been dropped"))
182 })
183 .try_fold(None, |prev_min_committed_epoch, committed_epoch| {
184 committed_epoch.map(|committed_epoch| {
185 if let Some(prev_min_committed_epoch) = prev_min_committed_epoch
186 && prev_min_committed_epoch <= committed_epoch
187 {
188 Some(prev_min_committed_epoch)
189 } else {
190 Some(committed_epoch)
191 }
192 })
193 })?
194 .unwrap_or_else(Epoch::now);
195 Ok(epoch)
196 }
197
198 pub fn version(&self) -> &FrontendHummockVersion {
199 &self.value
200 }
201
202 pub fn list_change_log_epochs(
203 &self,
204 table_id: TableId,
205 min_epoch: u64,
206 max_count: u32,
207 ) -> Vec<u64> {
208 if let Some(table_change_log) = self.value.table_change_log.get(&table_id) {
209 let table_change_log = table_change_log.clone();
210 table_change_log.get_non_empty_epochs(min_epoch, max_count as usize)
211 } else {
212 vec![]
213 }
214 }
215}
216
217fn invalid_snapshot() -> FrontendHummockVersion {
219 FrontendHummockVersion {
220 id: INVALID_VERSION_ID,
221 state_table_info: HummockVersionStateTableInfo::from_protobuf(&HashMap::new()),
222 table_change_log: Default::default(),
223 }
224}
225
226pub struct HummockSnapshotManager {
228 latest_snapshot: watch::Sender<PinnedSnapshotRef>,
238
239 table_change_log_notification_sender: watch::Sender<TableChangeLogNotificationMsg>,
240}
241
242#[derive(Default)]
243struct TableChangeLogNotificationMsg {
244 updated_change_log_table_ids: HashSet<TableId>,
245 deleted_table_ids: HashSet<TableId>,
246}
247
248pub type HummockSnapshotManagerRef = Arc<HummockSnapshotManager>;
249
250impl HummockSnapshotManager {
251 pub fn new(_meta_client: Arc<dyn FrontendMetaClient>) -> Self {
252 let latest_snapshot = Arc::new(PinnedSnapshot {
253 value: invalid_snapshot(),
254 });
255
256 let (latest_snapshot, _) = watch::channel(latest_snapshot);
257
258 let (table_change_log_notification_sender, _) =
259 watch::channel(TableChangeLogNotificationMsg::default());
260
261 Self {
262 latest_snapshot,
263 table_change_log_notification_sender,
264 }
265 }
266
267 pub fn acquire(&self) -> PinnedSnapshotRef {
269 self.latest_snapshot.borrow().clone()
270 }
271
272 pub fn init(&self, version: FrontendHummockVersion) {
273 let updated_change_log_table_ids: HashSet<_> = version
274 .table_change_log
275 .iter()
276 .filter_map(|(table_id, change_log)| {
277 if change_log.get_non_empty_epochs(0, usize::MAX).is_empty() {
278 None
279 } else {
280 Some(*table_id)
281 }
282 })
283 .collect();
284 self.table_change_log_notification_sender
285 .send(TableChangeLogNotificationMsg {
286 updated_change_log_table_ids,
287 deleted_table_ids: Default::default(),
288 })
289 .ok();
290
291 self.update_inner(|_| Some(version));
292 }
293
294 pub fn update(&self, deltas: HummockVersionDeltas) {
298 let updated_change_log_table_ids: HashSet<_> = deltas
299 .version_deltas
300 .iter()
301 .flat_map(|version_deltas| &version_deltas.change_log_delta)
302 .filter_map(|(table_id, change_log)| match change_log.new_log.as_ref() {
303 Some(new_log) => {
304 let new_value_empty = new_log.new_value.is_empty();
305 let old_value_empty = new_log.old_value.is_empty();
306 if !new_value_empty || !old_value_empty {
307 Some(*table_id)
308 } else {
309 None
310 }
311 }
312 None => None,
313 })
314 .collect();
315 let deleted_table_ids: HashSet<_> = deltas
316 .version_deltas
317 .iter()
318 .flat_map(|version_deltas| version_deltas.removed_table_ids.iter().copied())
319 .collect();
320 self.table_change_log_notification_sender
321 .send(TableChangeLogNotificationMsg {
322 updated_change_log_table_ids,
323 deleted_table_ids,
324 })
325 .ok();
326
327 self.update_inner(|old_snapshot| {
328 if deltas.version_deltas.is_empty() {
329 return None;
330 }
331 let mut snapshot = old_snapshot.clone();
332 for delta in deltas.version_deltas {
333 snapshot.apply_delta(FrontendHummockVersionDelta::from_protobuf(delta));
334 }
335 Some(snapshot)
336 })
337 }
338
339 pub fn add_table_for_test(&self, table_id: TableId) {
340 self.update_inner(|version| {
341 let mut version = version.clone();
342 version.id = version.id.next();
343 version.state_table_info.apply_delta(
344 &HashMap::from_iter([(
345 table_id,
346 StateTableInfoDelta {
347 committed_epoch: INVALID_EPOCH,
348 compaction_group_id: 0,
349 },
350 )]),
351 &HashSet::new(),
352 );
353 Some(version)
354 });
355 }
356
357 fn update_inner(
358 &self,
359 get_new_snapshot: impl FnOnce(&FrontendHummockVersion) -> Option<FrontendHummockVersion>,
360 ) {
361 self.latest_snapshot.send_if_modified(move |old_snapshot| {
362 let new_snapshot = get_new_snapshot(&old_snapshot.value);
363 let Some(snapshot) = new_snapshot else {
364 return false;
365 };
366 if snapshot.id <= old_snapshot.value.id {
367 assert_eq!(
368 snapshot.id, old_snapshot.value.id,
369 "receive stale frontend version"
370 );
371 return false;
372 }
373 *old_snapshot = Arc::new(PinnedSnapshot { value: snapshot });
374
375 true
376 });
377 }
378
379 pub async fn wait(&self, version_id: HummockVersionId) {
381 let mut rx = self.latest_snapshot.subscribe();
382 while rx.borrow_and_update().value.id < version_id {
383 rx.changed().await.unwrap();
384 }
385 }
386
387 pub async fn wait_table_change_log_notification(
388 &self,
389 table_id: TableId,
390 ) -> Result<(), RwError> {
391 let mut rx = self.table_change_log_notification_sender.subscribe();
392 loop {
393 rx.changed()
394 .await
395 .map_err(|_| ErrorCode::InternalError("cursor notify channel is closed.".into()))?;
396 let table_change_log_notification_msg = rx.borrow_and_update();
397 if table_change_log_notification_msg
398 .deleted_table_ids
399 .contains(&table_id)
400 {
401 return Err(ErrorCode::InternalError(format!(
402 "Cursor dependent table deleted: table_id is {:?}",
403 table_id
404 ))
405 .into());
406 }
407 if table_change_log_notification_msg
408 .updated_change_log_table_ids
409 .contains(&table_id)
410 {
411 break;
412 }
413 }
414 Ok(())
415 }
416}