risingwave_meta/barrier/checkpoint/independent_job/
mod.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::mem::take;
17
18use risingwave_common::catalog::TableId;
19use risingwave_common::util::epoch::Epoch;
20use risingwave_pb::id::FragmentId;
21use risingwave_pb::stream_plan::barrier::PbBarrierKind;
22
23pub(crate) mod batch_refresh_job;
24pub(crate) mod creating_job;
25
26pub(crate) use batch_refresh_job::{
27    BatchRefreshJobCheckpointControl, BatchRefreshLogicalFragments, BatchRefreshRenderResult,
28};
29pub(crate) use creating_job::CreatingStreamingJobControl;
30
31use crate::barrier::info::BarrierInfo;
32use crate::barrier::notifier::Notifier;
33use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager};
34use crate::barrier::{BackfillProgress, BarrierKind, FragmentBackfillProgress, TracedEpoch};
35use crate::controller::fragment::InflightFragmentInfo;
36
37/// Build a fake `BarrierInfo` for independent partial-graph barriers.
38///
39/// Shared by both `CreatingStreamingJobControl` and `BatchRefreshJobCheckpointControl`.
40fn new_fake_barrier(
41    prev_epoch_fake_physical_time: &mut u64,
42    pending_non_checkpoint_barriers: &mut Vec<u64>,
43    kind: PbBarrierKind,
44) -> BarrierInfo {
45    let prev_epoch = TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
46    *prev_epoch_fake_physical_time += 1;
47    let curr_epoch = TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
48    let kind = match kind {
49        PbBarrierKind::Unspecified => unreachable!(),
50        PbBarrierKind::Initial => {
51            assert!(pending_non_checkpoint_barriers.is_empty());
52            BarrierKind::Initial
53        }
54        PbBarrierKind::Barrier => {
55            pending_non_checkpoint_barriers.push(prev_epoch.value().0);
56            BarrierKind::Barrier
57        }
58        PbBarrierKind::Checkpoint => {
59            pending_non_checkpoint_barriers.push(prev_epoch.value().0);
60            BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
61        }
62    };
63    BarrierInfo {
64        prev_epoch,
65        curr_epoch,
66        kind,
67    }
68}
69
70// ── Enum unifying independent checkpoint job types ──────────────────────────
71
72/// A streaming job that checkpoints independently from the database's main graph,
73/// using its own partial graph.
74pub(crate) enum IndependentCheckpointJobControl {
75    CreatingStreamingJob(CreatingStreamingJobControl),
76    BatchRefresh(BatchRefreshJobCheckpointControl),
77}
78
79impl IndependentCheckpointJobControl {
80    pub(crate) fn gen_backfill_progress(&self) -> Option<BackfillProgress> {
81        match self {
82            Self::CreatingStreamingJob(j) => Some(j.gen_backfill_progress()),
83            Self::BatchRefresh(j) => j.gen_backfill_progress(),
84        }
85    }
86
87    /// Returns `true` if this job is actively consuming a snapshot.
88    ///
89    /// For creating streaming jobs this is always `true` (they exist only while
90    /// backfilling). Batch refresh jobs are only snapshot-backfilling while in
91    /// `ConsumingSnapshot` or `FinishingSnapshot`; once they transition to `Idle`
92    /// they no longer pin upstream log epochs.
93    pub(crate) fn is_snapshot_backfilling(&self) -> bool {
94        match self {
95            Self::CreatingStreamingJob(_) => true,
96            Self::BatchRefresh(j) => j.is_snapshot_backfilling(),
97        }
98    }
99
100    /// Collect a barrier and return whether a checkpoint should be forced in the next barrier.
101    pub(crate) fn collect(&mut self, collected_barrier: CollectedBarrier<'_>) -> bool {
102        match self {
103            Self::CreatingStreamingJob(j) => j.collect(collected_barrier),
104            Self::BatchRefresh(j) => j.collect(collected_barrier),
105        }
106    }
107
108    pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
109        match self {
110            Self::CreatingStreamingJob(j) => j.gen_fragment_backfill_progress(),
111            Self::BatchRefresh(j) => j.gen_fragment_backfill_progress(),
112        }
113    }
114
115    pub(crate) fn pinned_upstream_log_epoch(&self) -> (u64, HashSet<TableId>) {
116        match self {
117            Self::CreatingStreamingJob(j) => j.pinned_upstream_log_epoch(),
118            Self::BatchRefresh(j) => j.pinned_upstream_log_epoch(),
119        }
120    }
121
122    pub(crate) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
123        match self {
124            Self::CreatingStreamingJob(j) => j.fragment_infos(),
125            Self::BatchRefresh(j) => j.fragment_infos(),
126        }
127    }
128
129    pub(crate) fn ack_completed(
130        &mut self,
131        partial_graph_manager: &mut PartialGraphManager,
132        epoch: u64,
133    ) {
134        match self {
135            Self::CreatingStreamingJob(j) => j.ack_completed(partial_graph_manager, epoch),
136            Self::BatchRefresh(j) => j.ack_completed(partial_graph_manager, epoch),
137        }
138    }
139
140    pub(crate) fn on_partial_graph_reset(self) {
141        match self {
142            Self::CreatingStreamingJob(j) => j.on_partial_graph_reset(),
143            Self::BatchRefresh(j) => j.on_partial_graph_reset(),
144        }
145    }
146
147    pub(crate) fn drop(
148        &mut self,
149        notifiers: &mut Vec<Notifier>,
150        partial_graph_manager: &mut PartialGraphManager,
151    ) -> bool {
152        match self {
153            Self::CreatingStreamingJob(j) => j.drop(notifiers, partial_graph_manager),
154            Self::BatchRefresh(j) => j.drop(notifiers, partial_graph_manager),
155        }
156    }
157
158    /// Reset during database recovery.
159    ///
160    /// Returns `true` if the partial graph was already resetting (from a prior drop),
161    /// meaning caller should not issue a new reset request.
162    pub(crate) fn reset(self) -> bool {
163        match self {
164            Self::CreatingStreamingJob(j) => j.reset(),
165            Self::BatchRefresh(j) => j.reset(),
166        }
167    }
168}