risingwave_meta/barrier/checkpoint/independent_job/
mod.rs1use std::collections::{HashMap, HashSet};
16use std::mem::take;
17
18use risingwave_common::catalog::TableId;
19use risingwave_common::util::epoch::Epoch;
20use risingwave_pb::id::FragmentId;
21use risingwave_pb::stream_plan::barrier::PbBarrierKind;
22
23pub(crate) mod creating_job;
24
25pub(crate) use creating_job::CreatingStreamingJobControl;
26
27use crate::barrier::info::BarrierInfo;
28use crate::barrier::notifier::Notifier;
29use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager};
30use crate::barrier::{BackfillProgress, BarrierKind, FragmentBackfillProgress, TracedEpoch};
31use crate::controller::fragment::InflightFragmentInfo;
32
33fn new_fake_barrier(
35 prev_epoch_fake_physical_time: &mut u64,
36 pending_non_checkpoint_barriers: &mut Vec<u64>,
37 kind: PbBarrierKind,
38) -> BarrierInfo {
39 let prev_epoch = TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
40 *prev_epoch_fake_physical_time += 1;
41 let curr_epoch = TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
42 let kind = match kind {
43 PbBarrierKind::Unspecified => unreachable!(),
44 PbBarrierKind::Initial => {
45 assert!(pending_non_checkpoint_barriers.is_empty());
46 BarrierKind::Initial
47 }
48 PbBarrierKind::Barrier => {
49 pending_non_checkpoint_barriers.push(prev_epoch.value().0);
50 BarrierKind::Barrier
51 }
52 PbBarrierKind::Checkpoint => {
53 pending_non_checkpoint_barriers.push(prev_epoch.value().0);
54 BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
55 }
56 };
57 BarrierInfo {
58 prev_epoch,
59 curr_epoch,
60 kind,
61 }
62}
63
64pub(crate) enum IndependentCheckpointJobControl {
69 CreatingStreamingJob(CreatingStreamingJobControl),
70}
71
72impl IndependentCheckpointJobControl {
73 pub(crate) fn gen_backfill_progress(&self) -> BackfillProgress {
74 match self {
75 Self::CreatingStreamingJob(j) => j.gen_backfill_progress(),
76 }
77 }
78
79 pub(crate) fn is_snapshot_backfilling(&self) -> bool {
86 match self {
87 Self::CreatingStreamingJob(_) => true,
88 }
89 }
90
91 pub(crate) fn collect(&mut self, collected_barrier: CollectedBarrier<'_>) -> bool {
93 match self {
94 Self::CreatingStreamingJob(j) => j.collect(collected_barrier),
95 }
96 }
97
98 pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
99 match self {
100 Self::CreatingStreamingJob(j) => j.gen_fragment_backfill_progress(),
101 }
102 }
103
104 pub(crate) fn pinned_upstream_log_epoch(&self) -> (u64, HashSet<TableId>) {
105 match self {
106 Self::CreatingStreamingJob(j) => j.pinned_upstream_log_epoch(),
107 }
108 }
109
110 pub(crate) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
111 match self {
112 Self::CreatingStreamingJob(j) => j.fragment_infos(),
113 }
114 }
115
116 pub(crate) fn ack_completed(
117 &mut self,
118 partial_graph_manager: &mut PartialGraphManager,
119 epoch: u64,
120 ) {
121 match self {
122 Self::CreatingStreamingJob(j) => j.ack_completed(partial_graph_manager, epoch),
123 }
124 }
125
126 pub(crate) fn on_partial_graph_reset(self) {
127 match self {
128 Self::CreatingStreamingJob(j) => j.on_partial_graph_reset(),
129 }
130 }
131
132 pub(crate) fn drop(
133 &mut self,
134 notifiers: &mut Vec<Notifier>,
135 partial_graph_manager: &mut PartialGraphManager,
136 ) -> bool {
137 match self {
138 Self::CreatingStreamingJob(j) => j.drop(notifiers, partial_graph_manager),
139 }
140 }
141
142 pub(crate) fn reset(self) -> bool {
147 match self {
148 Self::CreatingStreamingJob(j) => j.reset(),
149 }
150 }
151}