risingwave_meta/barrier/checkpoint/
state.rs1use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17use std::mem::take;
18
19use risingwave_common::catalog::{DatabaseId, TableId};
20use risingwave_common::id::JobId;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_pb::id::{ActorId, FragmentId, WorkerId};
23use risingwave_pb::stream_plan::barrier_mutation::Mutation;
24use tracing::warn;
25
26use crate::MetaResult;
27use crate::barrier::edge_builder::FragmentEdgeBuildResult;
28use crate::barrier::info::{
29 BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
30 SharedActorInfos, SubscriberType,
31};
32use crate::barrier::rpc::ControlStreamManager;
33use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
34use crate::controller::fragment::InflightFragmentInfo;
35use crate::model::StreamJobActorsToCreate;
36
37pub(in crate::barrier) struct BarrierWorkerState {
39 in_flight_prev_epoch: TracedEpoch,
44
45 pending_non_checkpoint_barriers: Vec<u64>,
47
48 pub(super) database_info: InflightDatabaseInfo,
50
51 is_paused: bool,
53}
54
55impl BarrierWorkerState {
56 pub(super) fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
57 Self {
58 in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
59 pending_non_checkpoint_barriers: vec![],
60 database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
61 is_paused: false,
62 }
63 }
64
65 pub fn recovery(
66 database_id: DatabaseId,
67 shared_actor_infos: SharedActorInfos,
68 in_flight_prev_epoch: TracedEpoch,
69 jobs: impl Iterator<Item = InflightStreamingJobInfo>,
70 is_paused: bool,
71 ) -> Self {
72 Self {
73 in_flight_prev_epoch,
74 pending_non_checkpoint_barriers: vec![],
75 database_info: InflightDatabaseInfo::recover(database_id, jobs, shared_actor_infos),
76 is_paused,
77 }
78 }
79
80 pub fn is_paused(&self) -> bool {
81 self.is_paused
82 }
83
84 fn set_is_paused(&mut self, is_paused: bool) {
85 if self.is_paused != is_paused {
86 tracing::info!(
87 currently_paused = self.is_paused,
88 newly_paused = is_paused,
89 "update paused state"
90 );
91 self.is_paused = is_paused;
92 }
93 }
94
95 pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
96 &self.in_flight_prev_epoch
97 }
98
99 pub fn next_barrier_info(
101 &mut self,
102 command: Option<&Command>,
103 is_checkpoint: bool,
104 curr_epoch: TracedEpoch,
105 ) -> Option<BarrierInfo> {
106 if self.database_info.is_empty()
107 && !matches!(&command, Some(Command::CreateStreamingJob { .. }))
108 {
109 return None;
110 };
111 assert!(
112 self.in_flight_prev_epoch.value() < curr_epoch.value(),
113 "curr epoch regress. {} > {}",
114 self.in_flight_prev_epoch.value(),
115 curr_epoch.value()
116 );
117 let prev_epoch = self.in_flight_prev_epoch.clone();
118 self.in_flight_prev_epoch = curr_epoch.clone();
119 self.pending_non_checkpoint_barriers
120 .push(prev_epoch.value().0);
121 let kind = if is_checkpoint {
122 let epochs = take(&mut self.pending_non_checkpoint_barriers);
123 BarrierKind::Checkpoint(epochs)
124 } else {
125 BarrierKind::Barrier
126 };
127 Some(BarrierInfo {
128 prev_epoch,
129 curr_epoch,
130 kind,
131 })
132 }
133}
134
135pub(super) struct ApplyCommandInfo {
136 pub node_actors: HashMap<WorkerId, HashSet<ActorId>>,
137 pub actors_to_create: Option<StreamJobActorsToCreate>,
138 pub mv_subscription_max_retention: HashMap<TableId, u64>,
139 pub table_ids_to_commit: HashSet<TableId>,
140 pub table_ids_to_sync: HashSet<TableId>,
141 pub jobs_to_wait: HashSet<JobId>,
142 pub mutation: Option<Mutation>,
143}
144
145impl BarrierWorkerState {
146 pub(super) fn apply_command(
149 &mut self,
150 command: Option<&Command>,
151 finished_snapshot_backfill_job_fragments: HashMap<
152 JobId,
153 HashMap<FragmentId, InflightFragmentInfo>,
154 >,
155 edges: &mut Option<FragmentEdgeBuildResult>,
156 control_stream_manager: &ControlStreamManager,
157 ) -> MetaResult<ApplyCommandInfo> {
158 let post_apply_changes = if let Some(Command::CreateStreamingJob {
160 job_type: CreateStreamingJobType::SnapshotBackfill(_),
161 ..
162 }) = command
163 {
164 None
165 } else if let Some((new_job, fragment_changes)) =
166 command.and_then(Command::fragment_changes)
167 {
168 Some(self.database_info.pre_apply(new_job, fragment_changes))
169 } else {
170 None
171 };
172
173 match &command {
174 Some(Command::CreateSubscription {
175 subscription_id,
176 upstream_mv_table_id,
177 retention_second,
178 }) => {
179 self.database_info.register_subscriber(
180 upstream_mv_table_id.as_job_id(),
181 subscription_id.as_subscriber_id(),
182 SubscriberType::Subscription(*retention_second),
183 );
184 }
185 Some(Command::CreateStreamingJob {
186 info,
187 job_type: CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
188 ..
189 }) => {
190 for upstream_mv_table_id in snapshot_backfill_info
191 .upstream_mv_table_id_to_backfill_epoch
192 .keys()
193 {
194 self.database_info.register_subscriber(
195 upstream_mv_table_id.as_job_id(),
196 info.streaming_job.id().as_subscriber_id(),
197 SubscriberType::SnapshotBackfill,
198 );
199 }
200 }
201 _ => {}
202 };
203
204 let mut table_ids_to_commit: HashSet<_> = self.database_info.existing_table_ids().collect();
205 let actors_to_create = command.as_ref().and_then(|command| {
206 command.actors_to_create(&self.database_info, edges, control_stream_manager)
207 });
208 let node_actors =
209 InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
210
211 if let Some(post_apply_changes) = post_apply_changes {
212 self.database_info.post_apply(post_apply_changes);
213 }
214
215 let mut jobs_to_wait = HashSet::new();
216 if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
217 assert!(
218 jobs_to_merge
219 .keys()
220 .all(|job_id| finished_snapshot_backfill_job_fragments.contains_key(job_id))
221 );
222 for (job_id, job_fragments) in finished_snapshot_backfill_job_fragments {
223 assert!(jobs_to_merge.contains_key(&job_id));
224 jobs_to_wait.insert(job_id);
225 table_ids_to_commit.extend(InflightFragmentInfo::existing_table_ids(
226 job_fragments.values(),
227 ));
228 self.database_info.add_existing(InflightStreamingJobInfo {
229 job_id,
230 fragment_infos: job_fragments,
231 subscribers: Default::default(), status: CreateStreamingJobStatus::Created,
233 cdc_table_backfill_tracker: None, });
235 }
236 } else {
237 assert!(finished_snapshot_backfill_job_fragments.is_empty());
238 }
239
240 match &command {
241 Some(Command::DropSubscription {
242 subscription_id,
243 upstream_mv_table_id,
244 }) => {
245 if self
246 .database_info
247 .unregister_subscriber(
248 upstream_mv_table_id.as_job_id(),
249 subscription_id.as_subscriber_id(),
250 )
251 .is_none()
252 {
253 warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
254 }
255 }
256 Some(Command::MergeSnapshotBackfillStreamingJobs(snapshot_backfill_jobs)) => {
257 for (snapshot_backfill_job_id, upstream_tables) in snapshot_backfill_jobs {
258 for upstream_mv_table_id in upstream_tables {
259 assert_matches!(
260 self.database_info.unregister_subscriber(
261 upstream_mv_table_id.as_job_id(),
262 snapshot_backfill_job_id.as_subscriber_id()
263 ),
264 Some(SubscriberType::SnapshotBackfill)
265 );
266 }
267 }
268 }
269 _ => {}
270 }
271
272 let prev_is_paused = self.is_paused();
273 let curr_is_paused = match command {
274 Some(Command::Pause) => true,
275 Some(Command::Resume) => false,
276 _ => prev_is_paused,
277 };
278 self.set_is_paused(curr_is_paused);
279
280 let table_ids_to_sync =
281 InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()).collect();
282
283 let mutation = if let Some(c) = &command {
284 c.to_mutation(
285 prev_is_paused,
286 edges,
287 control_stream_manager,
288 &mut self.database_info,
289 )?
290 } else {
291 None
292 };
293
294 Ok(ApplyCommandInfo {
295 node_actors,
296 actors_to_create,
297 mv_subscription_max_retention: self.database_info.max_subscription_retention(),
298 table_ids_to_commit,
299 table_ids_to_sync,
300 jobs_to_wait,
301 mutation,
302 })
303 }
304}