risingwave_meta/barrier/checkpoint/
state.rs1use std::collections::HashSet;
16use std::mem::take;
17
18use risingwave_common::catalog::TableId;
19use risingwave_common::util::epoch::Epoch;
20
21use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo, InflightSubscriptionInfo};
22use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
23
24pub(crate) struct BarrierWorkerState {
26 in_flight_prev_epoch: TracedEpoch,
31
32 pending_non_checkpoint_barriers: Vec<u64>,
34
35 pub(super) inflight_graph_info: InflightDatabaseInfo,
37
38 pub(super) inflight_subscription_info: InflightSubscriptionInfo,
39
40 is_paused: bool,
42}
43
44impl BarrierWorkerState {
45 pub(super) fn new() -> Self {
46 Self {
47 in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
48 pending_non_checkpoint_barriers: vec![],
49 inflight_graph_info: InflightDatabaseInfo::empty(),
50 inflight_subscription_info: InflightSubscriptionInfo::default(),
51 is_paused: false,
52 }
53 }
54
55 pub fn recovery(
56 in_flight_prev_epoch: TracedEpoch,
57 inflight_graph_info: InflightDatabaseInfo,
58 inflight_subscription_info: InflightSubscriptionInfo,
59 is_paused: bool,
60 ) -> Self {
61 Self {
62 in_flight_prev_epoch,
63 pending_non_checkpoint_barriers: vec![],
64 inflight_graph_info,
65 inflight_subscription_info,
66 is_paused,
67 }
68 }
69
70 pub fn is_paused(&self) -> bool {
71 self.is_paused
72 }
73
74 fn set_is_paused(&mut self, is_paused: bool) {
75 if self.is_paused != is_paused {
76 tracing::info!(
77 currently_paused = self.is_paused,
78 newly_paused = is_paused,
79 "update paused state"
80 );
81 self.is_paused = is_paused;
82 }
83 }
84
85 pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
86 &self.in_flight_prev_epoch
87 }
88
89 pub fn next_barrier_info(
91 &mut self,
92 command: Option<&Command>,
93 is_checkpoint: bool,
94 curr_epoch: TracedEpoch,
95 ) -> Option<BarrierInfo> {
96 if self.inflight_graph_info.is_empty()
97 && !matches!(&command, Some(Command::CreateStreamingJob { .. }))
98 {
99 return None;
100 };
101 assert!(
102 self.in_flight_prev_epoch.value() < curr_epoch.value(),
103 "curr epoch regress. {} > {}",
104 self.in_flight_prev_epoch.value(),
105 curr_epoch.value()
106 );
107 let prev_epoch = self.in_flight_prev_epoch.clone();
108 self.in_flight_prev_epoch = curr_epoch.clone();
109 self.pending_non_checkpoint_barriers
110 .push(prev_epoch.value().0);
111 let kind = if is_checkpoint {
112 let epochs = take(&mut self.pending_non_checkpoint_barriers);
113 BarrierKind::Checkpoint(epochs)
114 } else {
115 BarrierKind::Barrier
116 };
117 Some(BarrierInfo {
118 prev_epoch,
119 curr_epoch,
120 kind,
121 })
122 }
123
124 pub fn apply_command(
129 &mut self,
130 command: Option<&Command>,
131 ) -> (
132 InflightDatabaseInfo,
133 InflightSubscriptionInfo,
134 HashSet<TableId>,
135 HashSet<TableId>,
136 bool,
137 ) {
138 let fragment_changes = if let Some(Command::CreateStreamingJob {
140 job_type: CreateStreamingJobType::SnapshotBackfill(_),
141 ..
142 }) = command
143 {
144 None
145 } else if let Some(fragment_changes) = command.and_then(Command::fragment_changes) {
146 self.inflight_graph_info.pre_apply(&fragment_changes);
147 Some(fragment_changes)
148 } else {
149 None
150 };
151 if let Some(command) = &command {
152 self.inflight_subscription_info.pre_apply(command);
153 }
154
155 let info = self.inflight_graph_info.clone();
156 let subscription_info = self.inflight_subscription_info.clone();
157
158 if let Some(fragment_changes) = fragment_changes {
159 self.inflight_graph_info.post_apply(&fragment_changes);
160 }
161
162 let mut table_ids_to_commit: HashSet<_> = info.existing_table_ids().collect();
163 let mut jobs_to_wait = HashSet::new();
164 if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
165 for (table_id, (_, graph_info)) in jobs_to_merge {
166 jobs_to_wait.insert(*table_id);
167 table_ids_to_commit.extend(graph_info.existing_table_ids());
168 self.inflight_graph_info.extend(graph_info.clone());
169 }
170 }
171
172 if let Some(command) = command {
173 self.inflight_subscription_info.post_apply(command);
174 }
175
176 let prev_is_paused = self.is_paused();
177 let curr_is_paused = match command {
178 Some(Command::Pause) => true,
179 Some(Command::Resume) => false,
180 _ => prev_is_paused,
181 };
182 self.set_is_paused(curr_is_paused);
183
184 (
185 info,
186 subscription_info,
187 table_ids_to_commit,
188 jobs_to_wait,
189 prev_is_paused,
190 )
191 }
192}