risingwave_frontend/scheduler/
snapshot.rs1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH};
21use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
22use risingwave_hummock_sdk::{
23 FrontendHummockVersion, FrontendHummockVersionDelta, HummockVersionId, INVALID_VERSION_ID,
24};
25use risingwave_pb::batch_plan::plan_node::NodeBody;
26use risingwave_pb::common::{BatchQueryCommittedEpoch, BatchQueryEpoch, batch_query_epoch};
27use risingwave_pb::hummock::{HummockVersionDeltas, StateTableInfoDelta};
28use tokio::sync::watch;
29
30use crate::error::{ErrorCode, RwError};
31use crate::expr::InlineNowProcTime;
32use crate::meta_client::FrontendMetaClient;
33use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query};
34use crate::scheduler::{SchedulerError, SchedulerResult};
35
36#[derive(Clone)]
38pub enum ReadSnapshot {
39 FrontendPinned {
41 snapshot: PinnedSnapshotRef,
42 },
43
44 ReadUncommitted,
45
46 Other(Epoch),
50}
51
52impl ReadSnapshot {
53 fn batch_query_epoch(
55 &self,
56 read_storage_tables: &HashSet<TableId>,
57 ) -> Result<BatchQueryEpoch, SchedulerError> {
58 Ok(match self {
59 ReadSnapshot::FrontendPinned { snapshot } => BatchQueryEpoch {
60 epoch: Some(batch_query_epoch::Epoch::Committed(
61 BatchQueryCommittedEpoch {
62 epoch: snapshot.batch_query_epoch(read_storage_tables)?.0,
63 hummock_version_id: snapshot.value.id,
64 },
65 )),
66 },
67 ReadSnapshot::ReadUncommitted => BatchQueryEpoch {
68 epoch: Some(batch_query_epoch::Epoch::Current(Epoch::now().0)),
69 },
70 ReadSnapshot::Other(e) => BatchQueryEpoch {
71 epoch: Some(batch_query_epoch::Epoch::Backup(e.0)),
72 },
73 })
74 }
75
76 pub fn fill_batch_query_epoch(&self, query: &mut Query) -> SchedulerResult<()> {
77 fn on_node(
78 execution_plan_node: &mut ExecutionPlanNode,
79 f: &mut impl FnMut(&mut Option<BatchQueryEpoch>, TableId),
80 ) {
81 for node in &mut execution_plan_node.children {
82 on_node(node, f);
83 }
84 let (query_epoch, table_id) = match &mut execution_plan_node.node {
85 NodeBody::RowSeqScan(scan) => (
86 &mut scan.query_epoch,
87 scan.table_desc.as_ref().unwrap().table_id,
88 ),
89 NodeBody::DistributedLookupJoin(join) => (
90 &mut join.query_epoch,
91 join.inner_side_table_desc.as_ref().unwrap().table_id,
92 ),
93 NodeBody::LocalLookupJoin(join) => (
94 &mut join.query_epoch,
95 join.inner_side_table_desc.as_ref().unwrap().table_id,
96 ),
97 NodeBody::VectorIndexNearest(vector_index_read) => (
98 &mut vector_index_read.query_epoch,
99 vector_index_read.reader_desc.as_ref().unwrap().table_id,
100 ),
101 _ => {
102 return;
103 }
104 };
105 f(query_epoch, table_id);
106 }
107
108 fn on_query(query: &mut Query, mut f: impl FnMut(&mut Option<BatchQueryEpoch>, TableId)) {
109 for stage in query.stage_graph.stages.values_mut() {
110 on_node(&mut stage.root, &mut f);
111 }
112 }
113
114 let mut unspecified_epoch_table_ids = HashSet::new();
115 on_query(query, |query_epoch, table_id| {
116 if query_epoch.is_none() {
117 unspecified_epoch_table_ids.insert(table_id);
118 }
119 });
120
121 let query_epoch = self.batch_query_epoch(&unspecified_epoch_table_ids)?;
122
123 on_query(query, |node_query_epoch, _| {
124 if node_query_epoch.is_none() {
125 *node_query_epoch = Some(query_epoch);
126 }
127 });
128
129 Ok(())
130 }
131
132 pub fn inline_now_proc_time(&self) -> InlineNowProcTime {
133 let epoch = match self {
134 ReadSnapshot::FrontendPinned { snapshot } => snapshot
135 .value
136 .state_table_info
137 .max_table_committed_epoch()
138 .map(Epoch)
139 .unwrap_or_else(Epoch::now),
140 ReadSnapshot::ReadUncommitted => Epoch::now(),
141 ReadSnapshot::Other(epoch) => *epoch,
142 };
143 InlineNowProcTime::new(epoch)
144 }
145
146 pub fn support_barrier_read(&self) -> bool {
148 matches!(self, ReadSnapshot::ReadUncommitted)
149 }
150}
151
152pub struct PinnedSnapshot {
155 value: FrontendHummockVersion,
156}
157
158impl std::fmt::Debug for PinnedSnapshot {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 self.value.fmt(f)
161 }
162}
163
164pub type PinnedSnapshotRef = Arc<PinnedSnapshot>;
166
167impl PinnedSnapshot {
168 fn batch_query_epoch(
169 &self,
170 read_storage_tables: &HashSet<TableId>,
171 ) -> Result<Epoch, SchedulerError> {
172 let epoch = read_storage_tables
174 .iter()
175 .map(|table_id| {
176 self.value
177 .state_table_info
178 .info()
179 .get(table_id)
180 .map(|info| Epoch(info.committed_epoch))
181 .ok_or_else(|| anyhow!("table id {table_id} may have been dropped"))
182 })
183 .try_fold(None, |prev_min_committed_epoch, committed_epoch| {
184 committed_epoch.map(|committed_epoch| {
185 if let Some(prev_min_committed_epoch) = prev_min_committed_epoch
186 && prev_min_committed_epoch <= committed_epoch
187 {
188 Some(prev_min_committed_epoch)
189 } else {
190 Some(committed_epoch)
191 }
192 })
193 })?
194 .unwrap_or_else(Epoch::now);
195 Ok(epoch)
196 }
197
198 pub fn version(&self) -> &FrontendHummockVersion {
199 &self.value
200 }
201}
202
203fn invalid_snapshot() -> FrontendHummockVersion {
205 FrontendHummockVersion {
206 id: INVALID_VERSION_ID,
207 state_table_info: HummockVersionStateTableInfo::from_protobuf(&HashMap::new()),
208 }
209}
210
211pub struct HummockSnapshotManager {
213 latest_snapshot: watch::Sender<PinnedSnapshotRef>,
223
224 version_update_notification_sender: watch::Sender<()>,
225}
226
227pub type HummockSnapshotManagerRef = Arc<HummockSnapshotManager>;
228
229impl HummockSnapshotManager {
230 pub fn new(_meta_client: Arc<dyn FrontendMetaClient>) -> Self {
231 let latest_snapshot = Arc::new(PinnedSnapshot {
232 value: invalid_snapshot(),
233 });
234
235 let (latest_snapshot, _) = watch::channel(latest_snapshot);
236
237 let (version_update_notification_sender, _) = watch::channel(());
238
239 Self {
240 latest_snapshot,
241 version_update_notification_sender,
242 }
243 }
244
245 pub fn acquire(&self) -> PinnedSnapshotRef {
247 self.latest_snapshot.borrow().clone()
248 }
249
250 pub fn init(&self, version: FrontendHummockVersion) {
251 self.version_update_notification_sender.send(()).ok();
252
253 self.update_inner(|_| Some(version));
254 }
255
256 pub fn update(&self, deltas: HummockVersionDeltas) {
260 self.update_inner(|old_snapshot| {
261 if deltas.version_deltas.is_empty() {
262 return None;
263 }
264 let mut snapshot = old_snapshot.clone();
265 for delta in deltas.version_deltas {
266 snapshot.apply_delta(FrontendHummockVersionDelta::from_protobuf(delta));
267 }
268 Some(snapshot)
269 });
270 self.version_update_notification_sender.send(()).ok();
271 }
272
273 pub fn add_table_for_test(&self, table_id: TableId) {
274 self.update_inner(|version| {
275 let mut version = version.clone();
276 version.id += 1;
277 version.state_table_info.apply_delta(
278 &HashMap::from_iter([(
279 table_id,
280 StateTableInfoDelta {
281 committed_epoch: INVALID_EPOCH,
282 compaction_group_id: 0.into(),
283 },
284 )]),
285 &HashSet::new(),
286 );
287 Some(version)
288 });
289 }
290
291 fn update_inner(
292 &self,
293 get_new_snapshot: impl FnOnce(&FrontendHummockVersion) -> Option<FrontendHummockVersion>,
294 ) {
295 self.latest_snapshot.send_if_modified(move |old_snapshot| {
296 let new_snapshot = get_new_snapshot(&old_snapshot.value);
297 let Some(snapshot) = new_snapshot else {
298 return false;
299 };
300 if snapshot.id <= old_snapshot.value.id {
301 assert_eq!(
302 snapshot.id, old_snapshot.value.id,
303 "receive stale frontend version"
304 );
305 return false;
306 }
307 *old_snapshot = Arc::new(PinnedSnapshot { value: snapshot });
308
309 true
310 });
311 }
312
313 pub async fn wait(&self, version_id: HummockVersionId) {
315 let mut rx = self.latest_snapshot.subscribe();
316 while rx.borrow_and_update().value.id < version_id {
317 rx.changed().await.unwrap();
318 }
319 }
320
321 pub async fn wait_table_change_log_notification(
322 &self,
323 table_id: TableId,
324 seek_timestamp: u64,
325 ) -> Result<(), RwError> {
326 let mut rx = self.version_update_notification_sender.subscribe();
327 loop {
328 rx.changed()
329 .await
330 .map_err(|_| ErrorCode::InternalError("cursor notify channel is closed.".into()))?;
331 let _ = rx.borrow_and_update();
332 if let Some(info) = self
333 .acquire()
334 .version()
335 .state_table_info
336 .info()
337 .get(&table_id)
338 {
339 if info.committed_epoch >= seek_timestamp {
340 break;
341 }
342 } else {
343 return Err(ErrorCode::InternalError(format!(
344 "Cursor dependent table deleted: table_id is {:?}",
345 table_id
346 ))
347 .into());
348 }
349 }
350 Ok(())
351 }
352}