risingwave_frontend/scheduler/
snapshot.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use risingwave_common::catalog::TableId;
20use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH};
21use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
22use risingwave_hummock_sdk::{
23    FrontendHummockVersion, FrontendHummockVersionDelta, HummockVersionId, INVALID_VERSION_ID,
24};
25use risingwave_pb::batch_plan::plan_node::NodeBody;
26use risingwave_pb::common::{BatchQueryCommittedEpoch, BatchQueryEpoch, batch_query_epoch};
27use risingwave_pb::hummock::{HummockVersionDeltas, StateTableInfoDelta};
28use tokio::sync::watch;
29
30use crate::error::{ErrorCode, RwError};
31use crate::expr::InlineNowProcTime;
32use crate::meta_client::FrontendMetaClient;
33use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query};
34use crate::scheduler::{SchedulerError, SchedulerResult};
35
36/// The storage snapshot to read from in a query, which can be freely cloned.
37#[derive(Clone)]
38pub enum ReadSnapshot {
39    /// A frontend-pinned snapshot.
40    FrontendPinned {
41        snapshot: PinnedSnapshotRef,
42    },
43
44    ReadUncommitted,
45
46    /// Other arbitrary epoch, e.g. user specified.
47    /// Availability and consistency of underlying data should be guaranteed accordingly.
48    /// Currently it's only used for querying meta snapshot backup.
49    Other(Epoch),
50}
51
52impl ReadSnapshot {
53    /// Get the [`BatchQueryEpoch`] for this snapshot.
54    fn batch_query_epoch(
55        &self,
56        read_storage_tables: &HashSet<TableId>,
57    ) -> Result<BatchQueryEpoch, SchedulerError> {
58        Ok(match self {
59            ReadSnapshot::FrontendPinned { snapshot } => BatchQueryEpoch {
60                epoch: Some(batch_query_epoch::Epoch::Committed(
61                    BatchQueryCommittedEpoch {
62                        epoch: snapshot.batch_query_epoch(read_storage_tables)?.0,
63                        hummock_version_id: snapshot.value.id,
64                    },
65                )),
66            },
67            ReadSnapshot::ReadUncommitted => BatchQueryEpoch {
68                epoch: Some(batch_query_epoch::Epoch::Current(Epoch::now().0)),
69            },
70            ReadSnapshot::Other(e) => BatchQueryEpoch {
71                epoch: Some(batch_query_epoch::Epoch::Backup(e.0)),
72            },
73        })
74    }
75
76    pub fn fill_batch_query_epoch(&self, query: &mut Query) -> SchedulerResult<()> {
77        fn on_node(
78            execution_plan_node: &mut ExecutionPlanNode,
79            f: &mut impl FnMut(&mut Option<BatchQueryEpoch>, TableId),
80        ) {
81            for node in &mut execution_plan_node.children {
82                on_node(node, f);
83            }
84            let (query_epoch, table_id) = match &mut execution_plan_node.node {
85                NodeBody::RowSeqScan(scan) => (
86                    &mut scan.query_epoch,
87                    scan.table_desc.as_ref().unwrap().table_id,
88                ),
89                NodeBody::DistributedLookupJoin(join) => (
90                    &mut join.query_epoch,
91                    join.inner_side_table_desc.as_ref().unwrap().table_id,
92                ),
93                NodeBody::LocalLookupJoin(join) => (
94                    &mut join.query_epoch,
95                    join.inner_side_table_desc.as_ref().unwrap().table_id,
96                ),
97                NodeBody::VectorIndexNearest(vector_index_read) => (
98                    &mut vector_index_read.query_epoch,
99                    vector_index_read.reader_desc.as_ref().unwrap().table_id,
100                ),
101                _ => {
102                    return;
103                }
104            };
105            f(query_epoch, table_id);
106        }
107
108        fn on_query(query: &mut Query, mut f: impl FnMut(&mut Option<BatchQueryEpoch>, TableId)) {
109            for stage in query.stage_graph.stages.values_mut() {
110                on_node(&mut stage.root, &mut f);
111            }
112        }
113
114        let mut unspecified_epoch_table_ids = HashSet::new();
115        on_query(query, |query_epoch, table_id| {
116            if query_epoch.is_none() {
117                unspecified_epoch_table_ids.insert(table_id);
118            }
119        });
120
121        let query_epoch = self.batch_query_epoch(&unspecified_epoch_table_ids)?;
122
123        on_query(query, |node_query_epoch, _| {
124            if node_query_epoch.is_none() {
125                *node_query_epoch = Some(query_epoch);
126            }
127        });
128
129        Ok(())
130    }
131
132    pub fn inline_now_proc_time(&self) -> InlineNowProcTime {
133        let epoch = match self {
134            ReadSnapshot::FrontendPinned { snapshot } => snapshot
135                .value
136                .state_table_info
137                .max_table_committed_epoch()
138                .map(Epoch)
139                .unwrap_or_else(Epoch::now),
140            ReadSnapshot::ReadUncommitted => Epoch::now(),
141            ReadSnapshot::Other(epoch) => *epoch,
142        };
143        InlineNowProcTime::new(epoch)
144    }
145
146    /// Returns true if this snapshot is a barrier read.
147    pub fn support_barrier_read(&self) -> bool {
148        matches!(self, ReadSnapshot::ReadUncommitted)
149    }
150}
151
152// DO NOT implement `Clone` for `PinnedSnapshot` because it's a "resource" that should always be a
153// singleton for each snapshot. Use `PinnedSnapshotRef` instead.
154pub struct PinnedSnapshot {
155    value: FrontendHummockVersion,
156}
157
158impl std::fmt::Debug for PinnedSnapshot {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        self.value.fmt(f)
161    }
162}
163
164/// A reference to a frontend-pinned snapshot.
165pub type PinnedSnapshotRef = Arc<PinnedSnapshot>;
166
167impl PinnedSnapshot {
168    fn batch_query_epoch(
169        &self,
170        read_storage_tables: &HashSet<TableId>,
171    ) -> Result<Epoch, SchedulerError> {
172        // use the min committed epoch of tables involved in the scan
173        let epoch = read_storage_tables
174            .iter()
175            .map(|table_id| {
176                self.value
177                    .state_table_info
178                    .info()
179                    .get(table_id)
180                    .map(|info| Epoch(info.committed_epoch))
181                    .ok_or_else(|| anyhow!("table id {table_id} may have been dropped"))
182            })
183            .try_fold(None, |prev_min_committed_epoch, committed_epoch| {
184                committed_epoch.map(|committed_epoch| {
185                    if let Some(prev_min_committed_epoch) = prev_min_committed_epoch
186                        && prev_min_committed_epoch <= committed_epoch
187                    {
188                        Some(prev_min_committed_epoch)
189                    } else {
190                        Some(committed_epoch)
191                    }
192                })
193            })?
194            .unwrap_or_else(Epoch::now);
195        Ok(epoch)
196    }
197
198    pub fn version(&self) -> &FrontendHummockVersion {
199        &self.value
200    }
201}
202
203/// Returns an invalid snapshot, used for initial values.
204fn invalid_snapshot() -> FrontendHummockVersion {
205    FrontendHummockVersion {
206        id: INVALID_VERSION_ID,
207        state_table_info: HummockVersionStateTableInfo::from_protobuf(&HashMap::new()),
208    }
209}
210
211/// Cache of hummock snapshot in meta.
212pub struct HummockSnapshotManager {
213    /// The latest snapshot synced from the meta service.
214    ///
215    /// The `max_committed_epoch` and `max_current_epoch` are pushed from meta node to reduce rpc
216    /// number.
217    ///
218    /// We have two epoch(committed and current), We only use `committed_epoch` to pin or unpin,
219    /// because `committed_epoch` always less or equal `current_epoch`, and the data with
220    /// `current_epoch` is always in the shared buffer, so it will never be gc before the data
221    /// of `committed_epoch`.
222    latest_snapshot: watch::Sender<PinnedSnapshotRef>,
223
224    version_update_notification_sender: watch::Sender<()>,
225}
226
227pub type HummockSnapshotManagerRef = Arc<HummockSnapshotManager>;
228
229impl HummockSnapshotManager {
230    pub fn new(_meta_client: Arc<dyn FrontendMetaClient>) -> Self {
231        let latest_snapshot = Arc::new(PinnedSnapshot {
232            value: invalid_snapshot(),
233        });
234
235        let (latest_snapshot, _) = watch::channel(latest_snapshot);
236
237        let (version_update_notification_sender, _) = watch::channel(());
238
239        Self {
240            latest_snapshot,
241            version_update_notification_sender,
242        }
243    }
244
245    /// Acquire the latest snapshot by increasing its reference count.
246    pub fn acquire(&self) -> PinnedSnapshotRef {
247        self.latest_snapshot.borrow().clone()
248    }
249
250    pub fn init(&self, version: FrontendHummockVersion) {
251        self.version_update_notification_sender.send(()).ok();
252
253        self.update_inner(|_| Some(version));
254    }
255
256    /// Update the latest snapshot.
257    ///
258    /// Should only be called by the observer manager.
259    pub fn update(&self, deltas: HummockVersionDeltas) {
260        self.update_inner(|old_snapshot| {
261            if deltas.version_deltas.is_empty() {
262                return None;
263            }
264            let mut snapshot = old_snapshot.clone();
265            for delta in deltas.version_deltas {
266                snapshot.apply_delta(FrontendHummockVersionDelta::from_protobuf(delta));
267            }
268            Some(snapshot)
269        });
270        self.version_update_notification_sender.send(()).ok();
271    }
272
273    pub fn add_table_for_test(&self, table_id: TableId) {
274        self.update_inner(|version| {
275            let mut version = version.clone();
276            version.id += 1;
277            version.state_table_info.apply_delta(
278                &HashMap::from_iter([(
279                    table_id,
280                    StateTableInfoDelta {
281                        committed_epoch: INVALID_EPOCH,
282                        compaction_group_id: 0.into(),
283                    },
284                )]),
285                &HashSet::new(),
286            );
287            Some(version)
288        });
289    }
290
291    fn update_inner(
292        &self,
293        get_new_snapshot: impl FnOnce(&FrontendHummockVersion) -> Option<FrontendHummockVersion>,
294    ) {
295        self.latest_snapshot.send_if_modified(move |old_snapshot| {
296            let new_snapshot = get_new_snapshot(&old_snapshot.value);
297            let Some(snapshot) = new_snapshot else {
298                return false;
299            };
300            if snapshot.id <= old_snapshot.value.id {
301                assert_eq!(
302                    snapshot.id, old_snapshot.value.id,
303                    "receive stale frontend version"
304                );
305                return false;
306            }
307            *old_snapshot = Arc::new(PinnedSnapshot { value: snapshot });
308
309            true
310        });
311    }
312
313    /// Wait until the latest snapshot is newer than the given one.
314    pub async fn wait(&self, version_id: HummockVersionId) {
315        let mut rx = self.latest_snapshot.subscribe();
316        while rx.borrow_and_update().value.id < version_id {
317            rx.changed().await.unwrap();
318        }
319    }
320
321    pub async fn wait_table_change_log_notification(
322        &self,
323        table_id: TableId,
324        seek_timestamp: u64,
325    ) -> Result<(), RwError> {
326        let mut rx = self.version_update_notification_sender.subscribe();
327        loop {
328            rx.changed()
329                .await
330                .map_err(|_| ErrorCode::InternalError("cursor notify channel is closed.".into()))?;
331            let _ = rx.borrow_and_update();
332            if let Some(info) = self
333                .acquire()
334                .version()
335                .state_table_info
336                .info()
337                .get(&table_id)
338            {
339                if info.committed_epoch >= seek_timestamp {
340                    break;
341                }
342            } else {
343                return Err(ErrorCode::InternalError(format!(
344                    "Cursor dependent table deleted: table_id is {:?}",
345                    table_id
346                ))
347                .into());
348            }
349        }
350        Ok(())
351    }
352}