risingwave_meta/barrier/checkpoint/
state.rs1use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17use std::mem::take;
18
19use risingwave_common::catalog::{DatabaseId, TableId};
20use risingwave_common::id::JobId;
21use risingwave_common::util::epoch::Epoch;
22use tracing::warn;
23
24use crate::barrier::info::{
25 BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
26 SharedActorInfos, SubscriberType,
27};
28use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
29use crate::controller::fragment::InflightFragmentInfo;
30
31pub(crate) struct BarrierWorkerState {
33 in_flight_prev_epoch: TracedEpoch,
38
39 pending_non_checkpoint_barriers: Vec<u64>,
41
42 pub(super) inflight_graph_info: InflightDatabaseInfo,
44
45 is_paused: bool,
47}
48
49impl BarrierWorkerState {
50 pub(super) fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
51 Self {
52 in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
53 pending_non_checkpoint_barriers: vec![],
54 inflight_graph_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
55 is_paused: false,
56 }
57 }
58
59 pub fn recovery(
60 database_id: DatabaseId,
61 shared_actor_infos: SharedActorInfos,
62 in_flight_prev_epoch: TracedEpoch,
63 jobs: impl Iterator<Item = InflightStreamingJobInfo>,
64 is_paused: bool,
65 ) -> Self {
66 Self {
67 in_flight_prev_epoch,
68 pending_non_checkpoint_barriers: vec![],
69 inflight_graph_info: InflightDatabaseInfo::recover(
70 database_id,
71 jobs,
72 shared_actor_infos,
73 ),
74 is_paused,
75 }
76 }
77
78 pub fn is_paused(&self) -> bool {
79 self.is_paused
80 }
81
82 fn set_is_paused(&mut self, is_paused: bool) {
83 if self.is_paused != is_paused {
84 tracing::info!(
85 currently_paused = self.is_paused,
86 newly_paused = is_paused,
87 "update paused state"
88 );
89 self.is_paused = is_paused;
90 }
91 }
92
93 pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
94 &self.in_flight_prev_epoch
95 }
96
97 pub fn next_barrier_info(
99 &mut self,
100 command: Option<&Command>,
101 is_checkpoint: bool,
102 curr_epoch: TracedEpoch,
103 ) -> Option<BarrierInfo> {
104 if self.inflight_graph_info.is_empty()
105 && !matches!(&command, Some(Command::CreateStreamingJob { .. }))
106 {
107 return None;
108 };
109 assert!(
110 self.in_flight_prev_epoch.value() < curr_epoch.value(),
111 "curr epoch regress. {} > {}",
112 self.in_flight_prev_epoch.value(),
113 curr_epoch.value()
114 );
115 let prev_epoch = self.in_flight_prev_epoch.clone();
116 self.in_flight_prev_epoch = curr_epoch.clone();
117 self.pending_non_checkpoint_barriers
118 .push(prev_epoch.value().0);
119 let kind = if is_checkpoint {
120 let epochs = take(&mut self.pending_non_checkpoint_barriers);
121 BarrierKind::Checkpoint(epochs)
122 } else {
123 BarrierKind::Barrier
124 };
125 Some(BarrierInfo {
126 prev_epoch,
127 curr_epoch,
128 kind,
129 })
130 }
131
132 pub fn apply_command(
137 &mut self,
138 command: Option<&Command>,
139 ) -> (
140 InflightDatabaseInfo,
141 HashMap<TableId, u64>,
142 HashSet<TableId>,
143 HashSet<JobId>,
144 bool,
145 ) {
146 let fragment_changes = if let Some(Command::CreateStreamingJob {
148 job_type: CreateStreamingJobType::SnapshotBackfill(_),
149 ..
150 }) = command
151 {
152 None
153 } else if let Some((new_job_id, fragment_changes)) =
154 command.and_then(Command::fragment_changes)
155 {
156 self.inflight_graph_info
157 .pre_apply(new_job_id, &fragment_changes);
158 Some(fragment_changes)
159 } else {
160 None
161 };
162
163 match &command {
164 Some(Command::CreateSubscription {
165 subscription_id,
166 upstream_mv_table_id,
167 retention_second,
168 }) => {
169 self.inflight_graph_info.register_subscriber(
170 upstream_mv_table_id.as_job_id(),
171 subscription_id.as_raw_id(),
172 SubscriberType::Subscription(*retention_second),
173 );
174 }
175 Some(Command::CreateStreamingJob {
176 info,
177 job_type: CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
178 ..
179 }) => {
180 for upstream_mv_table_id in snapshot_backfill_info
181 .upstream_mv_table_id_to_backfill_epoch
182 .keys()
183 {
184 self.inflight_graph_info.register_subscriber(
185 upstream_mv_table_id.as_job_id(),
186 info.streaming_job.id().as_raw_id(),
187 SubscriberType::SnapshotBackfill,
188 );
189 }
190 }
191 _ => {}
192 };
193
194 let info = self.inflight_graph_info.clone();
195
196 if let Some(fragment_changes) = fragment_changes {
197 self.inflight_graph_info.post_apply(&fragment_changes);
198 }
199
200 let mut table_ids_to_commit: HashSet<_> = info.existing_table_ids().collect();
201 let mut jobs_to_wait = HashSet::new();
202 if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
203 for (&job_id, (_, graph_info)) in jobs_to_merge {
204 jobs_to_wait.insert(job_id);
205 table_ids_to_commit.extend(InflightFragmentInfo::existing_table_ids(
206 graph_info.values(),
207 ));
208 self.inflight_graph_info.add_existing(InflightStreamingJobInfo {
209 job_id,
210 fragment_infos: graph_info.clone(),
211 subscribers: Default::default(), status: CreateStreamingJobStatus::Created,
213 });
214 }
215 }
216
217 match &command {
218 Some(Command::DropSubscription {
219 subscription_id,
220 upstream_mv_table_id,
221 }) => {
222 if self
223 .inflight_graph_info
224 .unregister_subscriber(
225 upstream_mv_table_id.as_job_id(),
226 subscription_id.as_raw_id(),
227 )
228 .is_none()
229 {
230 warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
231 }
232 }
233 Some(Command::MergeSnapshotBackfillStreamingJobs(snapshot_backfill_jobs)) => {
234 for (snapshot_backfill_job_id, (upstream_mv_table_ids, _)) in snapshot_backfill_jobs
235 {
236 for upstream_mv_table_id in upstream_mv_table_ids {
237 assert_matches!(
238 self.inflight_graph_info.unregister_subscriber(
239 upstream_mv_table_id.as_job_id(),
240 snapshot_backfill_job_id.as_raw_id()
241 ),
242 Some(SubscriberType::SnapshotBackfill)
243 );
244 }
245 }
246 }
247 _ => {}
248 }
249
250 let prev_is_paused = self.is_paused();
251 let curr_is_paused = match command {
252 Some(Command::Pause) => true,
253 Some(Command::Resume) => false,
254 _ => prev_is_paused,
255 };
256 self.set_is_paused(curr_is_paused);
257
258 (
259 info,
260 self.inflight_graph_info.max_subscription_retention(),
261 table_ids_to_commit,
262 jobs_to_wait,
263 prev_is_paused,
264 )
265 }
266}