risingwave_meta/barrier/checkpoint/independent_job/
mod.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::mem::take;
17
18use risingwave_common::catalog::TableId;
19use risingwave_common::util::epoch::Epoch;
20use risingwave_pb::id::FragmentId;
21use risingwave_pb::stream_plan::barrier::PbBarrierKind;
22
23pub(crate) mod creating_job;
24
25pub(crate) use creating_job::CreatingStreamingJobControl;
26
27use crate::barrier::info::BarrierInfo;
28use crate::barrier::notifier::Notifier;
29use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager};
30use crate::barrier::{BackfillProgress, BarrierKind, FragmentBackfillProgress, TracedEpoch};
31use crate::controller::fragment::InflightFragmentInfo;
32
33/// Build a fake `BarrierInfo` for independent partial-graph barriers.
34fn new_fake_barrier(
35    prev_epoch_fake_physical_time: &mut u64,
36    pending_non_checkpoint_barriers: &mut Vec<u64>,
37    kind: PbBarrierKind,
38) -> BarrierInfo {
39    let prev_epoch = TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
40    *prev_epoch_fake_physical_time += 1;
41    let curr_epoch = TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
42    let kind = match kind {
43        PbBarrierKind::Unspecified => unreachable!(),
44        PbBarrierKind::Initial => {
45            assert!(pending_non_checkpoint_barriers.is_empty());
46            BarrierKind::Initial
47        }
48        PbBarrierKind::Barrier => {
49            pending_non_checkpoint_barriers.push(prev_epoch.value().0);
50            BarrierKind::Barrier
51        }
52        PbBarrierKind::Checkpoint => {
53            pending_non_checkpoint_barriers.push(prev_epoch.value().0);
54            BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
55        }
56    };
57    BarrierInfo {
58        prev_epoch,
59        curr_epoch,
60        kind,
61    }
62}
63
64// ── Enum unifying independent checkpoint job types ──────────────────────────
65
66/// A streaming job that checkpoints independently from the database's main graph,
67/// using its own partial graph.
68pub(crate) enum IndependentCheckpointJobControl {
69    CreatingStreamingJob(CreatingStreamingJobControl),
70}
71
72impl IndependentCheckpointJobControl {
73    pub(crate) fn gen_backfill_progress(&self) -> BackfillProgress {
74        match self {
75            Self::CreatingStreamingJob(j) => j.gen_backfill_progress(),
76        }
77    }
78
79    /// Returns `true` if this job is actively consuming a snapshot.
80    ///
81    /// For creating streaming jobs this is always `true` (they exist only while
82    /// backfilling). Batch refresh jobs are only snapshot-backfilling while in
83    /// `ConsumingSnapshot` or `FinishingSnapshot`; once they transition to `Idle`
84    /// they no longer pin upstream log epochs.
85    pub(crate) fn is_snapshot_backfilling(&self) -> bool {
86        match self {
87            Self::CreatingStreamingJob(_) => true,
88        }
89    }
90
91    /// Collect a barrier and return whether a checkpoint should be forced in the next barrier.
92    pub(crate) fn collect(&mut self, collected_barrier: CollectedBarrier<'_>) -> bool {
93        match self {
94            Self::CreatingStreamingJob(j) => j.collect(collected_barrier),
95        }
96    }
97
98    pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
99        match self {
100            Self::CreatingStreamingJob(j) => j.gen_fragment_backfill_progress(),
101        }
102    }
103
104    pub(crate) fn pinned_upstream_log_epoch(&self) -> (u64, HashSet<TableId>) {
105        match self {
106            Self::CreatingStreamingJob(j) => j.pinned_upstream_log_epoch(),
107        }
108    }
109
110    pub(crate) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
111        match self {
112            Self::CreatingStreamingJob(j) => j.fragment_infos(),
113        }
114    }
115
116    pub(crate) fn ack_completed(
117        &mut self,
118        partial_graph_manager: &mut PartialGraphManager,
119        epoch: u64,
120    ) {
121        match self {
122            Self::CreatingStreamingJob(j) => j.ack_completed(partial_graph_manager, epoch),
123        }
124    }
125
126    pub(crate) fn on_partial_graph_reset(self) {
127        match self {
128            Self::CreatingStreamingJob(j) => j.on_partial_graph_reset(),
129        }
130    }
131
132    pub(crate) fn drop(
133        &mut self,
134        notifiers: &mut Vec<Notifier>,
135        partial_graph_manager: &mut PartialGraphManager,
136    ) -> bool {
137        match self {
138            Self::CreatingStreamingJob(j) => j.drop(notifiers, partial_graph_manager),
139        }
140    }
141
142    /// Reset during database recovery.
143    ///
144    /// Returns `true` if the partial graph was already resetting (from a prior drop),
145    /// meaning caller should not issue a new reset request.
146    pub(crate) fn reset(self) -> bool {
147        match self {
148            Self::CreatingStreamingJob(j) => j.reset(),
149        }
150    }
151}