risingwave_meta/barrier/checkpoint/independent_job/
mod.rs1use std::collections::{HashMap, HashSet};
16use std::mem::take;
17
18use risingwave_common::catalog::TableId;
19use risingwave_common::util::epoch::Epoch;
20use risingwave_pb::id::FragmentId;
21use risingwave_pb::stream_plan::barrier::PbBarrierKind;
22
23pub(crate) mod batch_refresh_job;
24pub(crate) mod creating_job;
25
26pub(crate) use batch_refresh_job::{
27 BatchRefreshJobCheckpointControl, BatchRefreshJobTriggerContext, BatchRefreshLogicalFragments,
28 BatchRefreshRenderResult,
29};
30pub(crate) use creating_job::CreatingStreamingJobControl;
31
32use crate::barrier::info::BarrierInfo;
33use crate::barrier::notifier::Notifier;
34use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager};
35use crate::barrier::{BackfillProgress, BarrierKind, FragmentBackfillProgress, TracedEpoch};
36use crate::controller::fragment::InflightFragmentInfo;
37
38fn new_fake_barrier(
42 prev_epoch_fake_physical_time: &mut u64,
43 pending_non_checkpoint_barriers: &mut Vec<u64>,
44 kind: PbBarrierKind,
45) -> BarrierInfo {
46 let prev_epoch = TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
47 *prev_epoch_fake_physical_time += 1;
48 let curr_epoch = TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
49 let kind = match kind {
50 PbBarrierKind::Unspecified => unreachable!(),
51 PbBarrierKind::Initial => {
52 assert!(pending_non_checkpoint_barriers.is_empty());
53 BarrierKind::Initial
54 }
55 PbBarrierKind::Barrier => {
56 pending_non_checkpoint_barriers.push(prev_epoch.value().0);
57 BarrierKind::Barrier
58 }
59 PbBarrierKind::Checkpoint => {
60 pending_non_checkpoint_barriers.push(prev_epoch.value().0);
61 BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
62 }
63 };
64 BarrierInfo {
65 prev_epoch,
66 curr_epoch,
67 kind,
68 }
69}
70
71pub(crate) enum IndependentCheckpointJobControl {
76 CreatingStreamingJob(CreatingStreamingJobControl),
77 BatchRefresh(BatchRefreshJobCheckpointControl),
78}
79
80impl IndependentCheckpointJobControl {
81 pub(crate) fn gen_backfill_progress(&self) -> Option<BackfillProgress> {
82 match self {
83 Self::CreatingStreamingJob(j) => Some(j.gen_backfill_progress()),
84 Self::BatchRefresh(j) => j.gen_backfill_progress(),
85 }
86 }
87
88 pub(crate) fn is_snapshot_backfilling(&self) -> bool {
95 match self {
96 Self::CreatingStreamingJob(_) => true,
97 Self::BatchRefresh(j) => j.is_snapshot_backfilling(),
98 }
99 }
100
101 pub(crate) fn collect(&mut self, collected_barrier: CollectedBarrier<'_>) -> bool {
103 match self {
104 Self::CreatingStreamingJob(j) => j.collect(collected_barrier),
105 Self::BatchRefresh(j) => j.collect(collected_barrier),
106 }
107 }
108
109 pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
110 match self {
111 Self::CreatingStreamingJob(j) => j.gen_fragment_backfill_progress(),
112 Self::BatchRefresh(j) => j.gen_fragment_backfill_progress(),
113 }
114 }
115
116 pub(crate) fn pinned_upstream_log_epoch(&self) -> (u64, HashSet<TableId>) {
117 match self {
118 Self::CreatingStreamingJob(j) => j.pinned_upstream_log_epoch(),
119 Self::BatchRefresh(j) => j.pinned_upstream_log_epoch(),
120 }
121 }
122
123 pub(crate) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
124 match self {
125 Self::CreatingStreamingJob(j) => j.fragment_infos(),
126 Self::BatchRefresh(j) => j.fragment_infos(),
127 }
128 }
129
130 pub(crate) fn ack_completed(
131 &mut self,
132 partial_graph_manager: &mut PartialGraphManager,
133 epoch: u64,
134 ) {
135 match self {
136 Self::CreatingStreamingJob(j) => j.ack_completed(partial_graph_manager, epoch),
137 Self::BatchRefresh(j) => j.ack_completed(partial_graph_manager, epoch),
138 }
139 }
140
141 pub(crate) fn on_partial_graph_reset(self) {
142 match self {
143 Self::CreatingStreamingJob(j) => j.on_partial_graph_reset(),
144 Self::BatchRefresh(j) => j.on_partial_graph_reset(),
145 }
146 }
147
148 pub(crate) fn drop(
149 &mut self,
150 notifiers: &mut Vec<Notifier>,
151 partial_graph_manager: &mut PartialGraphManager,
152 ) -> bool {
153 match self {
154 Self::CreatingStreamingJob(j) => j.drop(notifiers, partial_graph_manager),
155 Self::BatchRefresh(j) => j.drop(notifiers, partial_graph_manager),
156 }
157 }
158
159 pub(crate) fn reset(self) -> bool {
164 match self {
165 Self::CreatingStreamingJob(j) => j.reset(),
166 Self::BatchRefresh(j) => j.reset(),
167 }
168 }
169}