risingwave_meta/barrier/checkpoint/independent_job/
mod.rs1use std::collections::{HashMap, HashSet};
16use std::mem::take;
17
18use risingwave_common::catalog::TableId;
19use risingwave_common::util::epoch::Epoch;
20use risingwave_pb::id::FragmentId;
21use risingwave_pb::stream_plan::barrier::PbBarrierKind;
22
23pub(crate) mod batch_refresh_job;
24pub(crate) mod creating_job;
25
26pub(crate) use batch_refresh_job::{
27 BatchRefreshJobCheckpointControl, BatchRefreshLogicalFragments, BatchRefreshRenderResult,
28};
29pub(crate) use creating_job::CreatingStreamingJobControl;
30
31use crate::barrier::info::BarrierInfo;
32use crate::barrier::notifier::Notifier;
33use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager};
34use crate::barrier::{BackfillProgress, BarrierKind, FragmentBackfillProgress, TracedEpoch};
35use crate::controller::fragment::InflightFragmentInfo;
36
37fn new_fake_barrier(
41 prev_epoch_fake_physical_time: &mut u64,
42 pending_non_checkpoint_barriers: &mut Vec<u64>,
43 kind: PbBarrierKind,
44) -> BarrierInfo {
45 let prev_epoch = TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
46 *prev_epoch_fake_physical_time += 1;
47 let curr_epoch = TracedEpoch::new(Epoch::from_physical_time(*prev_epoch_fake_physical_time));
48 let kind = match kind {
49 PbBarrierKind::Unspecified => unreachable!(),
50 PbBarrierKind::Initial => {
51 assert!(pending_non_checkpoint_barriers.is_empty());
52 BarrierKind::Initial
53 }
54 PbBarrierKind::Barrier => {
55 pending_non_checkpoint_barriers.push(prev_epoch.value().0);
56 BarrierKind::Barrier
57 }
58 PbBarrierKind::Checkpoint => {
59 pending_non_checkpoint_barriers.push(prev_epoch.value().0);
60 BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers))
61 }
62 };
63 BarrierInfo {
64 prev_epoch,
65 curr_epoch,
66 kind,
67 }
68}
69
70pub(crate) enum IndependentCheckpointJobControl {
75 CreatingStreamingJob(CreatingStreamingJobControl),
76 BatchRefresh(BatchRefreshJobCheckpointControl),
77}
78
79impl IndependentCheckpointJobControl {
80 pub(crate) fn gen_backfill_progress(&self) -> Option<BackfillProgress> {
81 match self {
82 Self::CreatingStreamingJob(j) => Some(j.gen_backfill_progress()),
83 Self::BatchRefresh(j) => j.gen_backfill_progress(),
84 }
85 }
86
87 pub(crate) fn is_snapshot_backfilling(&self) -> bool {
94 match self {
95 Self::CreatingStreamingJob(_) => true,
96 Self::BatchRefresh(j) => j.is_snapshot_backfilling(),
97 }
98 }
99
100 pub(crate) fn collect(&mut self, collected_barrier: CollectedBarrier<'_>) -> bool {
102 match self {
103 Self::CreatingStreamingJob(j) => j.collect(collected_barrier),
104 Self::BatchRefresh(j) => j.collect(collected_barrier),
105 }
106 }
107
108 pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
109 match self {
110 Self::CreatingStreamingJob(j) => j.gen_fragment_backfill_progress(),
111 Self::BatchRefresh(j) => j.gen_fragment_backfill_progress(),
112 }
113 }
114
115 pub(crate) fn pinned_upstream_log_epoch(&self) -> (u64, HashSet<TableId>) {
116 match self {
117 Self::CreatingStreamingJob(j) => j.pinned_upstream_log_epoch(),
118 Self::BatchRefresh(j) => j.pinned_upstream_log_epoch(),
119 }
120 }
121
122 pub(crate) fn fragment_infos(&self) -> Option<&HashMap<FragmentId, InflightFragmentInfo>> {
123 match self {
124 Self::CreatingStreamingJob(j) => j.fragment_infos(),
125 Self::BatchRefresh(j) => j.fragment_infos(),
126 }
127 }
128
129 pub(crate) fn ack_completed(
130 &mut self,
131 partial_graph_manager: &mut PartialGraphManager,
132 epoch: u64,
133 ) {
134 match self {
135 Self::CreatingStreamingJob(j) => j.ack_completed(partial_graph_manager, epoch),
136 Self::BatchRefresh(j) => j.ack_completed(partial_graph_manager, epoch),
137 }
138 }
139
140 pub(crate) fn on_partial_graph_reset(self) {
141 match self {
142 Self::CreatingStreamingJob(j) => j.on_partial_graph_reset(),
143 Self::BatchRefresh(j) => j.on_partial_graph_reset(),
144 }
145 }
146
147 pub(crate) fn drop(
148 &mut self,
149 notifiers: &mut Vec<Notifier>,
150 partial_graph_manager: &mut PartialGraphManager,
151 ) -> bool {
152 match self {
153 Self::CreatingStreamingJob(j) => j.drop(notifiers, partial_graph_manager),
154 Self::BatchRefresh(j) => j.drop(notifiers, partial_graph_manager),
155 }
156 }
157
158 pub(crate) fn reset(self) -> bool {
163 match self {
164 Self::CreatingStreamingJob(j) => j.reset(),
165 Self::BatchRefresh(j) => j.reset(),
166 }
167 }
168}