risingwave_frontend/scheduler/
snapshot.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH};
21use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
22use risingwave_hummock_sdk::{
23    FrontendHummockVersion, FrontendHummockVersionDelta, HummockVersionId, INVALID_VERSION_ID,
24};
25use risingwave_pb::common::{BatchQueryCommittedEpoch, BatchQueryEpoch, batch_query_epoch};
26use risingwave_pb::hummock::{HummockVersionDeltas, StateTableInfoDelta};
27use tokio::sync::watch;
28
29use crate::error::{ErrorCode, RwError};
30use crate::expr::InlineNowProcTime;
31use crate::meta_client::FrontendMetaClient;
32use crate::scheduler::SchedulerError;
33
34/// The storage snapshot to read from in a query, which can be freely cloned.
35#[derive(Clone)]
36pub enum ReadSnapshot {
37    /// A frontend-pinned snapshot.
38    FrontendPinned {
39        snapshot: PinnedSnapshotRef,
40    },
41
42    ReadUncommitted,
43
44    /// Other arbitrary epoch, e.g. user specified.
45    /// Availability and consistency of underlying data should be guaranteed accordingly.
46    /// Currently it's only used for querying meta snapshot backup.
47    Other(Epoch),
48}
49
50impl ReadSnapshot {
51    /// Get the [`BatchQueryEpoch`] for this snapshot.
52    pub fn batch_query_epoch(
53        &self,
54        read_storage_tables: &HashSet<TableId>,
55    ) -> Result<BatchQueryEpoch, SchedulerError> {
56        Ok(match self {
57            ReadSnapshot::FrontendPinned { snapshot } => BatchQueryEpoch {
58                epoch: Some(batch_query_epoch::Epoch::Committed(
59                    BatchQueryCommittedEpoch {
60                        epoch: snapshot.batch_query_epoch(read_storage_tables)?.0,
61                        hummock_version_id: snapshot.value.id.to_u64(),
62                    },
63                )),
64            },
65            ReadSnapshot::ReadUncommitted => BatchQueryEpoch {
66                epoch: Some(batch_query_epoch::Epoch::Current(u64::MAX)),
67            },
68            ReadSnapshot::Other(e) => BatchQueryEpoch {
69                epoch: Some(batch_query_epoch::Epoch::Backup(e.0)),
70            },
71        })
72    }
73
74    pub fn inline_now_proc_time(&self) -> InlineNowProcTime {
75        let epoch = match self {
76            ReadSnapshot::FrontendPinned { snapshot } => snapshot
77                .value
78                .state_table_info
79                .max_table_committed_epoch()
80                .map(Epoch)
81                .unwrap_or_else(Epoch::now),
82            ReadSnapshot::ReadUncommitted => Epoch::now(),
83            ReadSnapshot::Other(epoch) => *epoch,
84        };
85        InlineNowProcTime::new(epoch)
86    }
87
88    /// Returns true if this snapshot is a barrier read.
89    pub fn support_barrier_read(&self) -> bool {
90        matches!(self, ReadSnapshot::ReadUncommitted)
91    }
92}
93
94// DO NOT implement `Clone` for `PinnedSnapshot` because it's a "resource" that should always be a
95// singleton for each snapshot. Use `PinnedSnapshotRef` instead.
96pub struct PinnedSnapshot {
97    value: FrontendHummockVersion,
98}
99
100impl std::fmt::Debug for PinnedSnapshot {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        self.value.fmt(f)
103    }
104}
105
106/// A reference to a frontend-pinned snapshot.
107pub type PinnedSnapshotRef = Arc<PinnedSnapshot>;
108
109impl PinnedSnapshot {
110    fn batch_query_epoch(
111        &self,
112        read_storage_tables: &HashSet<TableId>,
113    ) -> Result<Epoch, SchedulerError> {
114        // use the min committed epoch of tables involved in the scan
115        let epoch = read_storage_tables
116            .iter()
117            .map(|table_id| {
118                self.value
119                    .state_table_info
120                    .info()
121                    .get(table_id)
122                    .map(|info| Epoch(info.committed_epoch))
123                    .ok_or_else(|| anyhow!("table id {table_id} may have been dropped"))
124            })
125            .try_fold(None, |prev_min_committed_epoch, committed_epoch| {
126                committed_epoch.map(|committed_epoch| {
127                    if let Some(prev_min_committed_epoch) = prev_min_committed_epoch
128                        && prev_min_committed_epoch <= committed_epoch
129                    {
130                        Some(prev_min_committed_epoch)
131                    } else {
132                        Some(committed_epoch)
133                    }
134                })
135            })?
136            .unwrap_or_else(Epoch::now);
137        Ok(epoch)
138    }
139
140    pub fn version(&self) -> &FrontendHummockVersion {
141        &self.value
142    }
143
144    pub fn list_change_log_epochs(
145        &self,
146        table_id: u32,
147        min_epoch: u64,
148        max_count: u32,
149    ) -> Vec<u64> {
150        if let Some(table_change_log) = self.value.table_change_log.get(&TableId::new(table_id)) {
151            let table_change_log = table_change_log.clone();
152            table_change_log.get_non_empty_epochs(min_epoch, max_count as usize)
153        } else {
154            vec![]
155        }
156    }
157}
158
159/// Returns an invalid snapshot, used for initial values.
160fn invalid_snapshot() -> FrontendHummockVersion {
161    FrontendHummockVersion {
162        id: INVALID_VERSION_ID,
163        state_table_info: HummockVersionStateTableInfo::from_protobuf(&HashMap::new()),
164        table_change_log: Default::default(),
165    }
166}
167
168/// Cache of hummock snapshot in meta.
169pub struct HummockSnapshotManager {
170    /// The latest snapshot synced from the meta service.
171    ///
172    /// The `max_committed_epoch` and `max_current_epoch` are pushed from meta node to reduce rpc
173    /// number.
174    ///
175    /// We have two epoch(committed and current), We only use `committed_epoch` to pin or unpin,
176    /// because `committed_epoch` always less or equal `current_epoch`, and the data with
177    /// `current_epoch` is always in the shared buffer, so it will never be gc before the data
178    /// of `committed_epoch`.
179    latest_snapshot: watch::Sender<PinnedSnapshotRef>,
180
181    table_change_log_notification_sender: watch::Sender<TableChangeLogNotificationMsg>,
182}
183
184#[derive(Default)]
185struct TableChangeLogNotificationMsg {
186    updated_change_log_table_ids: HashSet<u32>,
187    deleted_table_ids: HashSet<u32>,
188}
189
190pub type HummockSnapshotManagerRef = Arc<HummockSnapshotManager>;
191
192impl HummockSnapshotManager {
193    pub fn new(_meta_client: Arc<dyn FrontendMetaClient>) -> Self {
194        let latest_snapshot = Arc::new(PinnedSnapshot {
195            value: invalid_snapshot(),
196        });
197
198        let (latest_snapshot, _) = watch::channel(latest_snapshot);
199
200        let (table_change_log_notification_sender, _) =
201            watch::channel(TableChangeLogNotificationMsg::default());
202
203        Self {
204            latest_snapshot,
205            table_change_log_notification_sender,
206        }
207    }
208
209    /// Acquire the latest snapshot by increasing its reference count.
210    pub fn acquire(&self) -> PinnedSnapshotRef {
211        self.latest_snapshot.borrow().clone()
212    }
213
214    pub fn init(&self, version: FrontendHummockVersion) {
215        let updated_change_log_table_ids: HashSet<_> = version
216            .table_change_log
217            .iter()
218            .filter_map(|(table_id, change_log)| {
219                if change_log.get_non_empty_epochs(0, usize::MAX).is_empty() {
220                    None
221                } else {
222                    Some(table_id.table_id())
223                }
224            })
225            .collect();
226        self.table_change_log_notification_sender
227            .send(TableChangeLogNotificationMsg {
228                updated_change_log_table_ids,
229                deleted_table_ids: Default::default(),
230            })
231            .ok();
232
233        self.update_inner(|_| Some(version));
234    }
235
236    /// Update the latest snapshot.
237    ///
238    /// Should only be called by the observer manager.
239    pub fn update(&self, deltas: HummockVersionDeltas) {
240        let updated_change_log_table_ids: HashSet<_> = deltas
241            .version_deltas
242            .iter()
243            .flat_map(|version_deltas| &version_deltas.change_log_delta)
244            .filter_map(|(table_id, change_log)| match change_log.new_log.as_ref() {
245                Some(new_log) => {
246                    let new_value_empty = new_log.new_value.is_empty();
247                    let old_value_empty = new_log.old_value.is_empty();
248                    if !new_value_empty || !old_value_empty {
249                        Some(*table_id)
250                    } else {
251                        None
252                    }
253                }
254                None => None,
255            })
256            .collect();
257        let deleted_table_ids: HashSet<_> = deltas
258            .version_deltas
259            .iter()
260            .flat_map(|version_deltas| version_deltas.removed_table_ids.clone())
261            .collect();
262        self.table_change_log_notification_sender
263            .send(TableChangeLogNotificationMsg {
264                updated_change_log_table_ids,
265                deleted_table_ids,
266            })
267            .ok();
268
269        self.update_inner(|old_snapshot| {
270            if deltas.version_deltas.is_empty() {
271                return None;
272            }
273            let mut snapshot = old_snapshot.clone();
274            for delta in deltas.version_deltas {
275                snapshot.apply_delta(FrontendHummockVersionDelta::from_protobuf(delta));
276            }
277            Some(snapshot)
278        })
279    }
280
281    pub fn add_table_for_test(&self, table_id: TableId) {
282        self.update_inner(|version| {
283            let mut version = version.clone();
284            version.id = version.id.next();
285            version.state_table_info.apply_delta(
286                &HashMap::from_iter([(
287                    table_id,
288                    StateTableInfoDelta {
289                        committed_epoch: INVALID_EPOCH,
290                        compaction_group_id: 0,
291                    },
292                )]),
293                &HashSet::new(),
294            );
295            Some(version)
296        });
297    }
298
299    fn update_inner(
300        &self,
301        get_new_snapshot: impl FnOnce(&FrontendHummockVersion) -> Option<FrontendHummockVersion>,
302    ) {
303        self.latest_snapshot.send_if_modified(move |old_snapshot| {
304            let new_snapshot = get_new_snapshot(&old_snapshot.value);
305            let Some(snapshot) = new_snapshot else {
306                return false;
307            };
308            if snapshot.id <= old_snapshot.value.id {
309                assert_eq!(
310                    snapshot.id, old_snapshot.value.id,
311                    "receive stale frontend version"
312                );
313                return false;
314            }
315            *old_snapshot = Arc::new(PinnedSnapshot { value: snapshot });
316
317            true
318        });
319    }
320
321    /// Wait until the latest snapshot is newer than the given one.
322    pub async fn wait(&self, version_id: HummockVersionId) {
323        let mut rx = self.latest_snapshot.subscribe();
324        while rx.borrow_and_update().value.id < version_id {
325            rx.changed().await.unwrap();
326        }
327    }
328
329    pub async fn wait_table_change_log_notification(&self, table_id: u32) -> Result<(), RwError> {
330        let mut rx = self.table_change_log_notification_sender.subscribe();
331        loop {
332            rx.changed()
333                .await
334                .map_err(|_| ErrorCode::InternalError("cursor notify channel is closed.".into()))?;
335            let table_change_log_notification_msg = rx.borrow_and_update();
336            if table_change_log_notification_msg
337                .deleted_table_ids
338                .contains(&table_id)
339            {
340                return Err(ErrorCode::InternalError(format!(
341                    "Cursor dependent table deleted: table_id is {:?}",
342                    table_id
343                ))
344                .into());
345            }
346            if table_change_log_notification_msg
347                .updated_change_log_table_ids
348                .contains(&table_id)
349            {
350                break;
351            }
352        }
353        Ok(())
354    }
355}