risingwave_frontend/scheduler/
snapshot.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH};
21use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
22use risingwave_hummock_sdk::{
23    FrontendHummockVersion, FrontendHummockVersionDelta, HummockVersionId, INVALID_VERSION_ID,
24};
25use risingwave_pb::batch_plan::plan_node::NodeBody;
26use risingwave_pb::common::{BatchQueryCommittedEpoch, BatchQueryEpoch, batch_query_epoch};
27use risingwave_pb::hummock::{HummockVersionDeltas, StateTableInfoDelta};
28use tokio::sync::watch;
29
30use crate::error::{ErrorCode, RwError};
31use crate::expr::InlineNowProcTime;
32use crate::meta_client::FrontendMetaClient;
33use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query};
34use crate::scheduler::{SchedulerError, SchedulerResult};
35
36/// The storage snapshot to read from in a query, which can be freely cloned.
37#[derive(Clone)]
38pub enum ReadSnapshot {
39    /// A frontend-pinned snapshot.
40    FrontendPinned {
41        snapshot: PinnedSnapshotRef,
42    },
43
44    ReadUncommitted,
45
46    /// Other arbitrary epoch, e.g. user specified.
47    /// Availability and consistency of underlying data should be guaranteed accordingly.
48    /// Currently it's only used for querying meta snapshot backup.
49    Other(Epoch),
50}
51
52impl ReadSnapshot {
53    /// Get the [`BatchQueryEpoch`] for this snapshot.
54    fn batch_query_epoch(
55        &self,
56        read_storage_tables: &HashSet<TableId>,
57    ) -> Result<BatchQueryEpoch, SchedulerError> {
58        Ok(match self {
59            ReadSnapshot::FrontendPinned { snapshot } => BatchQueryEpoch {
60                epoch: Some(batch_query_epoch::Epoch::Committed(
61                    BatchQueryCommittedEpoch {
62                        epoch: snapshot.batch_query_epoch(read_storage_tables)?.0,
63                        hummock_version_id: snapshot.value.id.to_u64(),
64                    },
65                )),
66            },
67            ReadSnapshot::ReadUncommitted => BatchQueryEpoch {
68                epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)),
69            },
70            ReadSnapshot::Other(e) => BatchQueryEpoch {
71                epoch: Some(batch_query_epoch::Epoch::Backup(e.0)),
72            },
73        })
74    }
75
76    pub fn fill_batch_query_epoch(&self, query: &mut Query) -> SchedulerResult<()> {
77        fn on_node(
78            execution_plan_node: &mut ExecutionPlanNode,
79            f: &mut impl FnMut(&mut Option<BatchQueryEpoch>, u32),
80        ) {
81            for node in &mut execution_plan_node.children {
82                on_node(node, f);
83            }
84            let (query_epoch, table_id) = match &mut execution_plan_node.node {
85                NodeBody::RowSeqScan(scan) => (
86                    &mut scan.query_epoch,
87                    scan.table_desc.as_ref().unwrap().table_id,
88                ),
89                NodeBody::DistributedLookupJoin(join) => (
90                    &mut join.query_epoch,
91                    join.inner_side_table_desc.as_ref().unwrap().table_id,
92                ),
93                NodeBody::LocalLookupJoin(join) => (
94                    &mut join.query_epoch,
95                    join.inner_side_table_desc.as_ref().unwrap().table_id,
96                ),
97                NodeBody::VectorIndexNearest(vector_index_read) => (
98                    &mut vector_index_read.query_epoch,
99                    vector_index_read.reader_desc.as_ref().unwrap().table_id,
100                ),
101                _ => {
102                    return;
103                }
104            };
105            f(query_epoch, table_id);
106        }
107
108        fn on_query(query: &mut Query, mut f: impl FnMut(&mut Option<BatchQueryEpoch>, u32)) {
109            for stage in query.stage_graph.stages.values_mut() {
110                on_node(&mut stage.root, &mut f);
111            }
112        }
113
114        let mut unspecified_epoch_table_ids = HashSet::new();
115        on_query(query, |query_epoch, table_id| {
116            if query_epoch.is_none() {
117                unspecified_epoch_table_ids.insert(table_id.into());
118            }
119        });
120
121        let query_epoch = self.batch_query_epoch(&unspecified_epoch_table_ids)?;
122
123        on_query(query, |node_query_epoch, _| {
124            if node_query_epoch.is_none() {
125                *node_query_epoch = Some(query_epoch);
126            }
127        });
128
129        Ok(())
130    }
131
132    pub fn inline_now_proc_time(&self) -> InlineNowProcTime {
133        let epoch = match self {
134            ReadSnapshot::FrontendPinned { snapshot } => snapshot
135                .value
136                .state_table_info
137                .max_table_committed_epoch()
138                .map(Epoch)
139                .unwrap_or_else(Epoch::now),
140            ReadSnapshot::ReadUncommitted => Epoch::now(),
141            ReadSnapshot::Other(epoch) => *epoch,
142        };
143        InlineNowProcTime::new(epoch)
144    }
145
146    /// Returns true if this snapshot is a barrier read.
147    pub fn support_barrier_read(&self) -> bool {
148        matches!(self, ReadSnapshot::ReadUncommitted)
149    }
150}
151
152// DO NOT implement `Clone` for `PinnedSnapshot` because it's a "resource" that should always be a
153// singleton for each snapshot. Use `PinnedSnapshotRef` instead.
154pub struct PinnedSnapshot {
155    value: FrontendHummockVersion,
156}
157
158impl std::fmt::Debug for PinnedSnapshot {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        self.value.fmt(f)
161    }
162}
163
164/// A reference to a frontend-pinned snapshot.
165pub type PinnedSnapshotRef = Arc<PinnedSnapshot>;
166
167impl PinnedSnapshot {
168    fn batch_query_epoch(
169        &self,
170        read_storage_tables: &HashSet<TableId>,
171    ) -> Result<Epoch, SchedulerError> {
172        // use the min committed epoch of tables involved in the scan
173        let epoch = read_storage_tables
174            .iter()
175            .map(|table_id| {
176                self.value
177                    .state_table_info
178                    .info()
179                    .get(table_id)
180                    .map(|info| Epoch(info.committed_epoch))
181                    .ok_or_else(|| anyhow!("table id {table_id} may have been dropped"))
182            })
183            .try_fold(None, |prev_min_committed_epoch, committed_epoch| {
184                committed_epoch.map(|committed_epoch| {
185                    if let Some(prev_min_committed_epoch) = prev_min_committed_epoch
186                        && prev_min_committed_epoch <= committed_epoch
187                    {
188                        Some(prev_min_committed_epoch)
189                    } else {
190                        Some(committed_epoch)
191                    }
192                })
193            })?
194            .unwrap_or_else(Epoch::now);
195        Ok(epoch)
196    }
197
198    pub fn version(&self) -> &FrontendHummockVersion {
199        &self.value
200    }
201
202    pub fn list_change_log_epochs(
203        &self,
204        table_id: TableId,
205        min_epoch: u64,
206        max_count: u32,
207    ) -> Vec<u64> {
208        if let Some(table_change_log) = self.value.table_change_log.get(&table_id) {
209            let table_change_log = table_change_log.clone();
210            table_change_log.get_non_empty_epochs(min_epoch, max_count as usize)
211        } else {
212            vec![]
213        }
214    }
215}
216
217/// Returns an invalid snapshot, used for initial values.
218fn invalid_snapshot() -> FrontendHummockVersion {
219    FrontendHummockVersion {
220        id: INVALID_VERSION_ID,
221        state_table_info: HummockVersionStateTableInfo::from_protobuf(&HashMap::new()),
222        table_change_log: Default::default(),
223    }
224}
225
226/// Cache of hummock snapshot in meta.
227pub struct HummockSnapshotManager {
228    /// The latest snapshot synced from the meta service.
229    ///
230    /// The `max_committed_epoch` and `max_current_epoch` are pushed from meta node to reduce rpc
231    /// number.
232    ///
233    /// We have two epoch(committed and current), We only use `committed_epoch` to pin or unpin,
234    /// because `committed_epoch` always less or equal `current_epoch`, and the data with
235    /// `current_epoch` is always in the shared buffer, so it will never be gc before the data
236    /// of `committed_epoch`.
237    latest_snapshot: watch::Sender<PinnedSnapshotRef>,
238
239    table_change_log_notification_sender: watch::Sender<TableChangeLogNotificationMsg>,
240}
241
242#[derive(Default)]
243struct TableChangeLogNotificationMsg {
244    updated_change_log_table_ids: HashSet<TableId>,
245    deleted_table_ids: HashSet<TableId>,
246}
247
248pub type HummockSnapshotManagerRef = Arc<HummockSnapshotManager>;
249
250impl HummockSnapshotManager {
251    pub fn new(_meta_client: Arc<dyn FrontendMetaClient>) -> Self {
252        let latest_snapshot = Arc::new(PinnedSnapshot {
253            value: invalid_snapshot(),
254        });
255
256        let (latest_snapshot, _) = watch::channel(latest_snapshot);
257
258        let (table_change_log_notification_sender, _) =
259            watch::channel(TableChangeLogNotificationMsg::default());
260
261        Self {
262            latest_snapshot,
263            table_change_log_notification_sender,
264        }
265    }
266
267    /// Acquire the latest snapshot by increasing its reference count.
268    pub fn acquire(&self) -> PinnedSnapshotRef {
269        self.latest_snapshot.borrow().clone()
270    }
271
272    pub fn init(&self, version: FrontendHummockVersion) {
273        let updated_change_log_table_ids: HashSet<_> = version
274            .table_change_log
275            .iter()
276            .filter_map(|(table_id, change_log)| {
277                if change_log.get_non_empty_epochs(0, usize::MAX).is_empty() {
278                    None
279                } else {
280                    Some(*table_id)
281                }
282            })
283            .collect();
284        self.table_change_log_notification_sender
285            .send(TableChangeLogNotificationMsg {
286                updated_change_log_table_ids,
287                deleted_table_ids: Default::default(),
288            })
289            .ok();
290
291        self.update_inner(|_| Some(version));
292    }
293
294    /// Update the latest snapshot.
295    ///
296    /// Should only be called by the observer manager.
297    pub fn update(&self, deltas: HummockVersionDeltas) {
298        let updated_change_log_table_ids: HashSet<_> = deltas
299            .version_deltas
300            .iter()
301            .flat_map(|version_deltas| &version_deltas.change_log_delta)
302            .filter_map(|(table_id, change_log)| match change_log.new_log.as_ref() {
303                Some(new_log) => {
304                    let new_value_empty = new_log.new_value.is_empty();
305                    let old_value_empty = new_log.old_value.is_empty();
306                    if !new_value_empty || !old_value_empty {
307                        Some(TableId::new(*table_id))
308                    } else {
309                        None
310                    }
311                }
312                None => None,
313            })
314            .collect();
315        let deleted_table_ids: HashSet<_> = deltas
316            .version_deltas
317            .iter()
318            .flat_map(|version_deltas| {
319                version_deltas
320                    .removed_table_ids
321                    .iter()
322                    .map(|table_id| table_id.into())
323            })
324            .collect();
325        self.table_change_log_notification_sender
326            .send(TableChangeLogNotificationMsg {
327                updated_change_log_table_ids,
328                deleted_table_ids,
329            })
330            .ok();
331
332        self.update_inner(|old_snapshot| {
333            if deltas.version_deltas.is_empty() {
334                return None;
335            }
336            let mut snapshot = old_snapshot.clone();
337            for delta in deltas.version_deltas {
338                snapshot.apply_delta(FrontendHummockVersionDelta::from_protobuf(delta));
339            }
340            Some(snapshot)
341        })
342    }
343
344    pub fn add_table_for_test(&self, table_id: TableId) {
345        self.update_inner(|version| {
346            let mut version = version.clone();
347            version.id = version.id.next();
348            version.state_table_info.apply_delta(
349                &HashMap::from_iter([(
350                    table_id,
351                    StateTableInfoDelta {
352                        committed_epoch: INVALID_EPOCH,
353                        compaction_group_id: 0,
354                    },
355                )]),
356                &HashSet::new(),
357            );
358            Some(version)
359        });
360    }
361
362    fn update_inner(
363        &self,
364        get_new_snapshot: impl FnOnce(&FrontendHummockVersion) -> Option<FrontendHummockVersion>,
365    ) {
366        self.latest_snapshot.send_if_modified(move |old_snapshot| {
367            let new_snapshot = get_new_snapshot(&old_snapshot.value);
368            let Some(snapshot) = new_snapshot else {
369                return false;
370            };
371            if snapshot.id <= old_snapshot.value.id {
372                assert_eq!(
373                    snapshot.id, old_snapshot.value.id,
374                    "receive stale frontend version"
375                );
376                return false;
377            }
378            *old_snapshot = Arc::new(PinnedSnapshot { value: snapshot });
379
380            true
381        });
382    }
383
384    /// Wait until the latest snapshot is newer than the given one.
385    pub async fn wait(&self, version_id: HummockVersionId) {
386        let mut rx = self.latest_snapshot.subscribe();
387        while rx.borrow_and_update().value.id < version_id {
388            rx.changed().await.unwrap();
389        }
390    }
391
392    pub async fn wait_table_change_log_notification(
393        &self,
394        table_id: TableId,
395    ) -> Result<(), RwError> {
396        let mut rx = self.table_change_log_notification_sender.subscribe();
397        loop {
398            rx.changed()
399                .await
400                .map_err(|_| ErrorCode::InternalError("cursor notify channel is closed.".into()))?;
401            let table_change_log_notification_msg = rx.borrow_and_update();
402            if table_change_log_notification_msg
403                .deleted_table_ids
404                .contains(&table_id)
405            {
406                return Err(ErrorCode::InternalError(format!(
407                    "Cursor dependent table deleted: table_id is {:?}",
408                    table_id
409                ))
410                .into());
411            }
412            if table_change_log_notification_msg
413                .updated_change_log_table_ids
414                .contains(&table_id)
415            {
416                break;
417            }
418        }
419        Ok(())
420    }
421}