risingwave_frontend/scheduler/
snapshot.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH};
21use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
22use risingwave_hummock_sdk::{
23    FrontendHummockVersion, FrontendHummockVersionDelta, HummockVersionId, INVALID_VERSION_ID,
24};
25use risingwave_pb::batch_plan::plan_node::NodeBody;
26use risingwave_pb::common::{BatchQueryCommittedEpoch, BatchQueryEpoch, batch_query_epoch};
27use risingwave_pb::hummock::{HummockVersionDeltas, StateTableInfoDelta};
28use tokio::sync::watch;
29
30use crate::error::{ErrorCode, RwError};
31use crate::expr::InlineNowProcTime;
32use crate::meta_client::FrontendMetaClient;
33use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query};
34use crate::scheduler::{SchedulerError, SchedulerResult};
35
36/// The storage snapshot to read from in a query, which can be freely cloned.
37#[derive(Clone)]
38pub enum ReadSnapshot {
39    /// A frontend-pinned snapshot.
40    FrontendPinned {
41        snapshot: PinnedSnapshotRef,
42    },
43
44    ReadUncommitted,
45
46    /// Other arbitrary epoch, e.g. user specified.
47    /// Availability and consistency of underlying data should be guaranteed accordingly.
48    /// Currently it's only used for querying meta snapshot backup.
49    Other(Epoch),
50}
51
52impl ReadSnapshot {
53    /// Get the [`BatchQueryEpoch`] for this snapshot.
54    fn batch_query_epoch(
55        &self,
56        read_storage_tables: &HashSet<TableId>,
57    ) -> Result<BatchQueryEpoch, SchedulerError> {
58        Ok(match self {
59            ReadSnapshot::FrontendPinned { snapshot } => BatchQueryEpoch {
60                epoch: Some(batch_query_epoch::Epoch::Committed(
61                    BatchQueryCommittedEpoch {
62                        epoch: snapshot.batch_query_epoch(read_storage_tables)?.0,
63                        hummock_version_id: snapshot.value.id.to_u64(),
64                    },
65                )),
66            },
67            ReadSnapshot::ReadUncommitted => BatchQueryEpoch {
68                epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)),
69            },
70            ReadSnapshot::Other(e) => BatchQueryEpoch {
71                epoch: Some(batch_query_epoch::Epoch::Backup(e.0)),
72            },
73        })
74    }
75
76    pub fn fill_batch_query_epoch(&self, query: &mut Query) -> SchedulerResult<()> {
77        fn on_node(
78            execution_plan_node: &mut ExecutionPlanNode,
79            f: &mut impl FnMut(&mut Option<BatchQueryEpoch>, u32),
80        ) {
81            for node in &mut execution_plan_node.children {
82                on_node(node, f);
83            }
84            let (query_epoch, table_id) = match &mut execution_plan_node.node {
85                NodeBody::RowSeqScan(scan) => (
86                    &mut scan.query_epoch,
87                    scan.table_desc.as_ref().unwrap().table_id,
88                ),
89                NodeBody::DistributedLookupJoin(join) => (
90                    &mut join.query_epoch,
91                    join.inner_side_table_desc.as_ref().unwrap().table_id,
92                ),
93                NodeBody::LocalLookupJoin(join) => (
94                    &mut join.query_epoch,
95                    join.inner_side_table_desc.as_ref().unwrap().table_id,
96                ),
97                NodeBody::VectorIndexNearest(vector_index_read) => (
98                    &mut vector_index_read.query_epoch,
99                    vector_index_read.reader_desc.as_ref().unwrap().table_id,
100                ),
101                _ => {
102                    return;
103                }
104            };
105            f(query_epoch, table_id);
106        }
107
108        fn on_query(query: &mut Query, mut f: impl FnMut(&mut Option<BatchQueryEpoch>, u32)) {
109            for stage in query.stage_graph.stages.values_mut() {
110                on_node(&mut stage.root, &mut f);
111            }
112        }
113
114        let mut unspecified_epoch_table_ids = HashSet::new();
115        on_query(query, |query_epoch, table_id| {
116            if query_epoch.is_none() {
117                unspecified_epoch_table_ids.insert(table_id.into());
118            }
119        });
120
121        let query_epoch = self.batch_query_epoch(&unspecified_epoch_table_ids)?;
122
123        on_query(query, |node_query_epoch, _| {
124            if node_query_epoch.is_none() {
125                *node_query_epoch = Some(query_epoch);
126            }
127        });
128
129        Ok(())
130    }
131
132    pub fn inline_now_proc_time(&self) -> InlineNowProcTime {
133        let epoch = match self {
134            ReadSnapshot::FrontendPinned { snapshot } => snapshot
135                .value
136                .state_table_info
137                .max_table_committed_epoch()
138                .map(Epoch)
139                .unwrap_or_else(Epoch::now),
140            ReadSnapshot::ReadUncommitted => Epoch::now(),
141            ReadSnapshot::Other(epoch) => *epoch,
142        };
143        InlineNowProcTime::new(epoch)
144    }
145
146    /// Returns true if this snapshot is a barrier read.
147    pub fn support_barrier_read(&self) -> bool {
148        matches!(self, ReadSnapshot::ReadUncommitted)
149    }
150}
151
152// DO NOT implement `Clone` for `PinnedSnapshot` because it's a "resource" that should always be a
153// singleton for each snapshot. Use `PinnedSnapshotRef` instead.
154pub struct PinnedSnapshot {
155    value: FrontendHummockVersion,
156}
157
158impl std::fmt::Debug for PinnedSnapshot {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        self.value.fmt(f)
161    }
162}
163
164/// A reference to a frontend-pinned snapshot.
165pub type PinnedSnapshotRef = Arc<PinnedSnapshot>;
166
167impl PinnedSnapshot {
168    fn batch_query_epoch(
169        &self,
170        read_storage_tables: &HashSet<TableId>,
171    ) -> Result<Epoch, SchedulerError> {
172        // use the min committed epoch of tables involved in the scan
173        let epoch = read_storage_tables
174            .iter()
175            .map(|table_id| {
176                self.value
177                    .state_table_info
178                    .info()
179                    .get(table_id)
180                    .map(|info| Epoch(info.committed_epoch))
181                    .ok_or_else(|| anyhow!("table id {table_id} may have been dropped"))
182            })
183            .try_fold(None, |prev_min_committed_epoch, committed_epoch| {
184                committed_epoch.map(|committed_epoch| {
185                    if let Some(prev_min_committed_epoch) = prev_min_committed_epoch
186                        && prev_min_committed_epoch <= committed_epoch
187                    {
188                        Some(prev_min_committed_epoch)
189                    } else {
190                        Some(committed_epoch)
191                    }
192                })
193            })?
194            .unwrap_or_else(Epoch::now);
195        Ok(epoch)
196    }
197
198    pub fn version(&self) -> &FrontendHummockVersion {
199        &self.value
200    }
201
202    pub fn list_change_log_epochs(
203        &self,
204        table_id: u32,
205        min_epoch: u64,
206        max_count: u32,
207    ) -> Vec<u64> {
208        if let Some(table_change_log) = self.value.table_change_log.get(&TableId::new(table_id)) {
209            let table_change_log = table_change_log.clone();
210            table_change_log.get_non_empty_epochs(min_epoch, max_count as usize)
211        } else {
212            vec![]
213        }
214    }
215}
216
217/// Returns an invalid snapshot, used for initial values.
218fn invalid_snapshot() -> FrontendHummockVersion {
219    FrontendHummockVersion {
220        id: INVALID_VERSION_ID,
221        state_table_info: HummockVersionStateTableInfo::from_protobuf(&HashMap::new()),
222        table_change_log: Default::default(),
223    }
224}
225
226/// Cache of hummock snapshot in meta.
227pub struct HummockSnapshotManager {
228    /// The latest snapshot synced from the meta service.
229    ///
230    /// The `max_committed_epoch` and `max_current_epoch` are pushed from meta node to reduce rpc
231    /// number.
232    ///
233    /// We have two epoch(committed and current), We only use `committed_epoch` to pin or unpin,
234    /// because `committed_epoch` always less or equal `current_epoch`, and the data with
235    /// `current_epoch` is always in the shared buffer, so it will never be gc before the data
236    /// of `committed_epoch`.
237    latest_snapshot: watch::Sender<PinnedSnapshotRef>,
238
239    table_change_log_notification_sender: watch::Sender<TableChangeLogNotificationMsg>,
240}
241
242#[derive(Default)]
243struct TableChangeLogNotificationMsg {
244    updated_change_log_table_ids: HashSet<u32>,
245    deleted_table_ids: HashSet<u32>,
246}
247
248pub type HummockSnapshotManagerRef = Arc<HummockSnapshotManager>;
249
250impl HummockSnapshotManager {
251    pub fn new(_meta_client: Arc<dyn FrontendMetaClient>) -> Self {
252        let latest_snapshot = Arc::new(PinnedSnapshot {
253            value: invalid_snapshot(),
254        });
255
256        let (latest_snapshot, _) = watch::channel(latest_snapshot);
257
258        let (table_change_log_notification_sender, _) =
259            watch::channel(TableChangeLogNotificationMsg::default());
260
261        Self {
262            latest_snapshot,
263            table_change_log_notification_sender,
264        }
265    }
266
267    /// Acquire the latest snapshot by increasing its reference count.
268    pub fn acquire(&self) -> PinnedSnapshotRef {
269        self.latest_snapshot.borrow().clone()
270    }
271
272    pub fn init(&self, version: FrontendHummockVersion) {
273        let updated_change_log_table_ids: HashSet<_> = version
274            .table_change_log
275            .iter()
276            .filter_map(|(table_id, change_log)| {
277                if change_log.get_non_empty_epochs(0, usize::MAX).is_empty() {
278                    None
279                } else {
280                    Some(table_id.table_id())
281                }
282            })
283            .collect();
284        self.table_change_log_notification_sender
285            .send(TableChangeLogNotificationMsg {
286                updated_change_log_table_ids,
287                deleted_table_ids: Default::default(),
288            })
289            .ok();
290
291        self.update_inner(|_| Some(version));
292    }
293
294    /// Update the latest snapshot.
295    ///
296    /// Should only be called by the observer manager.
297    pub fn update(&self, deltas: HummockVersionDeltas) {
298        let updated_change_log_table_ids: HashSet<_> = deltas
299            .version_deltas
300            .iter()
301            .flat_map(|version_deltas| &version_deltas.change_log_delta)
302            .filter_map(|(table_id, change_log)| match change_log.new_log.as_ref() {
303                Some(new_log) => {
304                    let new_value_empty = new_log.new_value.is_empty();
305                    let old_value_empty = new_log.old_value.is_empty();
306                    if !new_value_empty || !old_value_empty {
307                        Some(*table_id)
308                    } else {
309                        None
310                    }
311                }
312                None => None,
313            })
314            .collect();
315        let deleted_table_ids: HashSet<_> = deltas
316            .version_deltas
317            .iter()
318            .flat_map(|version_deltas| version_deltas.removed_table_ids.clone())
319            .collect();
320        self.table_change_log_notification_sender
321            .send(TableChangeLogNotificationMsg {
322                updated_change_log_table_ids,
323                deleted_table_ids,
324            })
325            .ok();
326
327        self.update_inner(|old_snapshot| {
328            if deltas.version_deltas.is_empty() {
329                return None;
330            }
331            let mut snapshot = old_snapshot.clone();
332            for delta in deltas.version_deltas {
333                snapshot.apply_delta(FrontendHummockVersionDelta::from_protobuf(delta));
334            }
335            Some(snapshot)
336        })
337    }
338
339    pub fn add_table_for_test(&self, table_id: TableId) {
340        self.update_inner(|version| {
341            let mut version = version.clone();
342            version.id = version.id.next();
343            version.state_table_info.apply_delta(
344                &HashMap::from_iter([(
345                    table_id,
346                    StateTableInfoDelta {
347                        committed_epoch: INVALID_EPOCH,
348                        compaction_group_id: 0,
349                    },
350                )]),
351                &HashSet::new(),
352            );
353            Some(version)
354        });
355    }
356
357    fn update_inner(
358        &self,
359        get_new_snapshot: impl FnOnce(&FrontendHummockVersion) -> Option<FrontendHummockVersion>,
360    ) {
361        self.latest_snapshot.send_if_modified(move |old_snapshot| {
362            let new_snapshot = get_new_snapshot(&old_snapshot.value);
363            let Some(snapshot) = new_snapshot else {
364                return false;
365            };
366            if snapshot.id <= old_snapshot.value.id {
367                assert_eq!(
368                    snapshot.id, old_snapshot.value.id,
369                    "receive stale frontend version"
370                );
371                return false;
372            }
373            *old_snapshot = Arc::new(PinnedSnapshot { value: snapshot });
374
375            true
376        });
377    }
378
379    /// Wait until the latest snapshot is newer than the given one.
380    pub async fn wait(&self, version_id: HummockVersionId) {
381        let mut rx = self.latest_snapshot.subscribe();
382        while rx.borrow_and_update().value.id < version_id {
383            rx.changed().await.unwrap();
384        }
385    }
386
387    pub async fn wait_table_change_log_notification(&self, table_id: u32) -> Result<(), RwError> {
388        let mut rx = self.table_change_log_notification_sender.subscribe();
389        loop {
390            rx.changed()
391                .await
392                .map_err(|_| ErrorCode::InternalError("cursor notify channel is closed.".into()))?;
393            let table_change_log_notification_msg = rx.borrow_and_update();
394            if table_change_log_notification_msg
395                .deleted_table_ids
396                .contains(&table_id)
397            {
398                return Err(ErrorCode::InternalError(format!(
399                    "Cursor dependent table deleted: table_id is {:?}",
400                    table_id
401                ))
402                .into());
403            }
404            if table_change_log_notification_msg
405                .updated_change_log_table_ids
406                .contains(&table_id)
407            {
408                break;
409            }
410        }
411        Ok(())
412    }
413}