Skip to main content

risingwave_meta/barrier/checkpoint/independent_job/
mod.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::mem::take;
17
18use risingwave_common::catalog::TableId;
19use risingwave_common::util::epoch::Epoch;
20use risingwave_pb::id::FragmentId;
21use risingwave_pb::stream_plan::barrier::PbBarrierKind;
22
23pub(crate) mod batch_refresh_job;
24pub(crate) mod creating_job;
25
26pub(crate) use batch_refresh_job::{
27    BatchRefreshJobCheckpointControl, BatchRefreshJobTriggerContext, BatchRefreshLogicalFragments,
28    BatchRefreshRenderResult,
29};
30pub(crate) use creating_job::CreatingStreamingJobControl;
31
32use crate::barrier::info::BarrierInfo;
33use crate::barrier::notifier::Notifier;
34use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager};
35use crate::barrier::{BackfillProgress, BarrierKind, FragmentBackfillProgress, TracedEpoch};
36use crate::controller::fragment::InflightFragmentInfo;
37
38/// Build a fake `BarrierInfo` for independent partial-graph barriers.
39///
40/// Shared by both `CreatingStreamingJobControl` and `BatchRefreshJobCheckpointControl`.
41fn new_fake_barrier(
42    prev_epoch_fake_physical_time: &mut u64,
43    pending_non_checkpoint_barriers: &mut Vec<u64>,
44    kind: PbBarrierKind,
45) -> BarrierInfo {
46    let prev_epoch = TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
47    *prev_epoch_fake_physical_time += 1;
48    let curr_epoch = TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
49    let kind = match kind {
50        PbBarrierKind::Unspecified => unreachable!(),
51        PbBarrierKind::Initial => {
52            assert!(pending_non_checkpoint_barriers.is_empty());
53            BarrierKind::Initial
54        }
55        PbBarrierKind::Barrier => {
56            pending_non_checkpoint_barriers.push(prev_epoch.value().0);
57            BarrierKind::Barrier
58        }
59        PbBarrierKind::Checkpoint => {
60            pending_non_checkpoint_barriers.push(prev_epoch.value().0);
61            BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
62        }
63    };
64    BarrierInfo {
65        prev_epoch,
66        curr_epoch,
67        kind,
68    }
69}
70
71// ── Enum unifying independent checkpoint job types ──────────────────────────
72
73/// A streaming job that checkpoints independently from the database's main graph,
74/// using its own partial graph.
75pub(crate) enum IndependentCheckpointJobControl {
76    CreatingStreamingJob(CreatingStreamingJobControl),
77    BatchRefresh(BatchRefreshJobCheckpointControl),
78}
79
80impl IndependentCheckpointJobControl {
81    pub(crate) fn gen_backfill_progress(&self) -> Option<BackfillProgress> {
82        match self {
83            Self::CreatingStreamingJob(j) => Some(j.gen_backfill_progress()),
84            Self::BatchRefresh(j) => j.gen_backfill_progress(),
85        }
86    }
87
88    /// Returns `true` if this job is actively consuming a snapshot.
89    ///
90    /// For creating streaming jobs this is always `true` (they exist only while
91    /// backfilling). Batch refresh jobs are only snapshot-backfilling while in
92    /// `ConsumingSnapshot` or `FinishingSnapshot`; once they transition to `Idle`
93    /// they no longer pin upstream log epochs.
94    pub(crate) fn is_snapshot_backfilling(&self) -> bool {
95        match self {
96            Self::CreatingStreamingJob(_) => true,
97            Self::BatchRefresh(j) => j.is_snapshot_backfilling(),
98        }
99    }
100
101    /// Collect a barrier and return whether a checkpoint should be forced in the next barrier.
102    pub(crate) fn collect(&mut self, collected_barrier: CollectedBarrier<'_>) -> bool {
103        match self {
104            Self::CreatingStreamingJob(j) => j.collect(collected_barrier),
105            Self::BatchRefresh(j) => j.collect(collected_barrier),
106        }
107    }
108
109    pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
110        match self {
111            Self::CreatingStreamingJob(j) => j.gen_fragment_backfill_progress(),
112            Self::BatchRefresh(j) => j.gen_fragment_backfill_progress(),
113        }
114    }
115
116    pub(crate) fn pinned_upstream_log_epoch(&self) -> (u64, HashSet<TableId>) {
117        match self {
118            Self::CreatingStreamingJob(j) => j.pinned_upstream_log_epoch(),
119            Self::BatchRefresh(j) => j.pinned_upstream_log_epoch(),
120        }
121    }
122
123    pub(crate) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
124        match self {
125            Self::CreatingStreamingJob(j) => j.fragment_infos(),
126            Self::BatchRefresh(j) => j.fragment_infos(),
127        }
128    }
129
130    pub(crate) fn ack_completed(
131        &mut self,
132        partial_graph_manager: &mut PartialGraphManager,
133        epoch: u64,
134    ) {
135        match self {
136            Self::CreatingStreamingJob(j) => j.ack_completed(partial_graph_manager, epoch),
137            Self::BatchRefresh(j) => j.ack_completed(partial_graph_manager, epoch),
138        }
139    }
140
141    pub(crate) fn on_partial_graph_reset(self) {
142        match self {
143            Self::CreatingStreamingJob(j) => j.on_partial_graph_reset(),
144            Self::BatchRefresh(j) => j.on_partial_graph_reset(),
145        }
146    }
147
148    pub(crate) fn drop(
149        &mut self,
150        notifiers: &mut Vec<Notifier>,
151        partial_graph_manager: &mut PartialGraphManager,
152    ) -> bool {
153        match self {
154            Self::CreatingStreamingJob(j) => j.drop(notifiers, partial_graph_manager),
155            Self::BatchRefresh(j) => j.drop(notifiers, partial_graph_manager),
156        }
157    }
158
159    /// Reset during database recovery.
160    ///
161    /// Returns `true` if the partial graph was already resetting (from a prior drop),
162    /// meaning caller should not issue a new reset request.
163    pub(crate) fn reset(self) -> bool {
164        match self {
165            Self::CreatingStreamingJob(j) => j.reset(),
166            Self::BatchRefresh(j) => j.reset(),
167        }
168    }
169}