risingwave_meta/barrier/checkpoint/
state.rs1use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17use std::mem::take;
18
19use risingwave_common::catalog::{DatabaseId, TableId};
20use risingwave_common::id::JobId;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_pb::id::{ActorId, FragmentId, WorkerId};
23use risingwave_pb::stream_plan::barrier_mutation::Mutation;
24use tracing::warn;
25
26use crate::barrier::edge_builder::FragmentEdgeBuildResult;
27use crate::barrier::info::{
28 BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
29 SharedActorInfos, SubscriberType,
30};
31use crate::barrier::rpc::ControlStreamManager;
32use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
33use crate::controller::fragment::InflightFragmentInfo;
34use crate::model::StreamJobActorsToCreate;
35
36pub(in crate::barrier) struct BarrierWorkerState {
38 in_flight_prev_epoch: TracedEpoch,
43
44 pending_non_checkpoint_barriers: Vec<u64>,
46
47 pub(super) database_info: InflightDatabaseInfo,
49
50 is_paused: bool,
52}
53
54impl BarrierWorkerState {
55 pub(super) fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
56 Self {
57 in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
58 pending_non_checkpoint_barriers: vec![],
59 database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
60 is_paused: false,
61 }
62 }
63
64 pub fn recovery(
65 database_id: DatabaseId,
66 shared_actor_infos: SharedActorInfos,
67 in_flight_prev_epoch: TracedEpoch,
68 jobs: impl Iterator<Item = InflightStreamingJobInfo>,
69 is_paused: bool,
70 ) -> Self {
71 Self {
72 in_flight_prev_epoch,
73 pending_non_checkpoint_barriers: vec![],
74 database_info: InflightDatabaseInfo::recover(database_id, jobs, shared_actor_infos),
75 is_paused,
76 }
77 }
78
79 pub fn is_paused(&self) -> bool {
80 self.is_paused
81 }
82
83 fn set_is_paused(&mut self, is_paused: bool) {
84 if self.is_paused != is_paused {
85 tracing::info!(
86 currently_paused = self.is_paused,
87 newly_paused = is_paused,
88 "update paused state"
89 );
90 self.is_paused = is_paused;
91 }
92 }
93
94 pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
95 &self.in_flight_prev_epoch
96 }
97
98 pub fn next_barrier_info(
100 &mut self,
101 command: Option<&Command>,
102 is_checkpoint: bool,
103 curr_epoch: TracedEpoch,
104 ) -> Option<BarrierInfo> {
105 if self.database_info.is_empty()
106 && !matches!(&command, Some(Command::CreateStreamingJob { .. }))
107 {
108 return None;
109 };
110 assert!(
111 self.in_flight_prev_epoch.value() < curr_epoch.value(),
112 "curr epoch regress. {} > {}",
113 self.in_flight_prev_epoch.value(),
114 curr_epoch.value()
115 );
116 let prev_epoch = self.in_flight_prev_epoch.clone();
117 self.in_flight_prev_epoch = curr_epoch.clone();
118 self.pending_non_checkpoint_barriers
119 .push(prev_epoch.value().0);
120 let kind = if is_checkpoint {
121 let epochs = take(&mut self.pending_non_checkpoint_barriers);
122 BarrierKind::Checkpoint(epochs)
123 } else {
124 BarrierKind::Barrier
125 };
126 Some(BarrierInfo {
127 prev_epoch,
128 curr_epoch,
129 kind,
130 })
131 }
132}
133
134pub(super) struct ApplyCommandInfo {
135 pub node_actors: HashMap<WorkerId, HashSet<ActorId>>,
136 pub actors_to_create: Option<StreamJobActorsToCreate>,
137 pub mv_subscription_max_retention: HashMap<TableId, u64>,
138 pub table_ids_to_commit: HashSet<TableId>,
139 pub table_ids_to_sync: HashSet<TableId>,
140 pub jobs_to_wait: HashSet<JobId>,
141 pub mutation: Option<Mutation>,
142}
143
144impl BarrierWorkerState {
145 pub(super) fn apply_command(
148 &mut self,
149 command: Option<&Command>,
150 finished_snapshot_backfill_job_fragments: HashMap<
151 JobId,
152 HashMap<FragmentId, InflightFragmentInfo>,
153 >,
154 edges: &mut Option<FragmentEdgeBuildResult>,
155 control_stream_manager: &ControlStreamManager,
156 ) -> ApplyCommandInfo {
157 let post_apply_changes = if let Some(Command::CreateStreamingJob {
159 job_type: CreateStreamingJobType::SnapshotBackfill(_),
160 ..
161 }) = command
162 {
163 None
164 } else if let Some((new_job_id, fragment_changes)) =
165 command.and_then(Command::fragment_changes)
166 {
167 Some(self.database_info.pre_apply(new_job_id, fragment_changes))
168 } else {
169 None
170 };
171
172 match &command {
173 Some(Command::CreateSubscription {
174 subscription_id,
175 upstream_mv_table_id,
176 retention_second,
177 }) => {
178 self.database_info.register_subscriber(
179 upstream_mv_table_id.as_job_id(),
180 subscription_id.as_subscriber_id(),
181 SubscriberType::Subscription(*retention_second),
182 );
183 }
184 Some(Command::CreateStreamingJob {
185 info,
186 job_type: CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
187 ..
188 }) => {
189 for upstream_mv_table_id in snapshot_backfill_info
190 .upstream_mv_table_id_to_backfill_epoch
191 .keys()
192 {
193 self.database_info.register_subscriber(
194 upstream_mv_table_id.as_job_id(),
195 info.streaming_job.id().as_subscriber_id(),
196 SubscriberType::SnapshotBackfill,
197 );
198 }
199 }
200 _ => {}
201 };
202
203 let mut table_ids_to_commit: HashSet<_> = self.database_info.existing_table_ids().collect();
204 let actors_to_create = command.as_ref().and_then(|command| {
205 command.actors_to_create(&self.database_info, edges, control_stream_manager)
206 });
207 let node_actors =
208 InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
209
210 if let Some(post_apply_changes) = post_apply_changes {
211 self.database_info.post_apply(post_apply_changes);
212 }
213
214 let mut jobs_to_wait = HashSet::new();
215 if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
216 assert!(
217 jobs_to_merge
218 .keys()
219 .all(|job_id| finished_snapshot_backfill_job_fragments.contains_key(job_id))
220 );
221 for (job_id, job_fragments) in finished_snapshot_backfill_job_fragments {
222 assert!(jobs_to_merge.contains_key(&job_id));
223 jobs_to_wait.insert(job_id);
224 table_ids_to_commit.extend(InflightFragmentInfo::existing_table_ids(
225 job_fragments.values(),
226 ));
227 self.database_info.add_existing(InflightStreamingJobInfo {
228 job_id,
229 fragment_infos: job_fragments,
230 subscribers: Default::default(), status: CreateStreamingJobStatus::Created,
232 });
233 }
234 } else {
235 assert!(finished_snapshot_backfill_job_fragments.is_empty());
236 }
237
238 match &command {
239 Some(Command::DropSubscription {
240 subscription_id,
241 upstream_mv_table_id,
242 }) => {
243 if self
244 .database_info
245 .unregister_subscriber(
246 upstream_mv_table_id.as_job_id(),
247 subscription_id.as_subscriber_id(),
248 )
249 .is_none()
250 {
251 warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
252 }
253 }
254 Some(Command::MergeSnapshotBackfillStreamingJobs(snapshot_backfill_jobs)) => {
255 for (snapshot_backfill_job_id, upstream_tables) in snapshot_backfill_jobs {
256 for upstream_mv_table_id in upstream_tables {
257 assert_matches!(
258 self.database_info.unregister_subscriber(
259 upstream_mv_table_id.as_job_id(),
260 snapshot_backfill_job_id.as_subscriber_id()
261 ),
262 Some(SubscriberType::SnapshotBackfill)
263 );
264 }
265 }
266 }
267 _ => {}
268 }
269
270 let prev_is_paused = self.is_paused();
271 let curr_is_paused = match command {
272 Some(Command::Pause) => true,
273 Some(Command::Resume) => false,
274 _ => prev_is_paused,
275 };
276 self.set_is_paused(curr_is_paused);
277
278 let table_ids_to_sync =
279 InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()).collect();
280
281 let mutation = command
282 .as_ref()
283 .and_then(|c| c.to_mutation(prev_is_paused, edges, control_stream_manager));
284
285 ApplyCommandInfo {
286 node_actors,
287 actors_to_create,
288 mv_subscription_max_retention: self.database_info.max_subscription_retention(),
289 table_ids_to_commit,
290 table_ids_to_sync,
291 jobs_to_wait,
292 mutation,
293 }
294 }
295}