1use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17use std::mem::take;
18
19use risingwave_common::catalog::TableId;
20use risingwave_common::id::JobId;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_pb::hummock::HummockVersionStats;
23use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
24use risingwave_pb::stream_plan::barrier_mutation::{Mutation, PbMutation};
25use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
26use risingwave_pb::stream_plan::{
27 PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation, ThrottleMutation,
28};
29use tracing::warn;
30
31use crate::MetaResult;
32use crate::barrier::checkpoint::{CreatingStreamingJobControl, DatabaseCheckpointControl};
33use crate::barrier::command::PostCollectCommand;
34use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
35use crate::barrier::edge_builder::FragmentEdgeBuilder;
36use crate::barrier::info::{
37 BarrierInfo, CreateStreamingJobStatus, InflightStreamingJobInfo, SubscriberType,
38};
39use crate::barrier::notifier::Notifier;
40use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
41use crate::barrier::utils::NodeToCollect;
42use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
43use crate::controller::fragment::InflightFragmentInfo;
44use crate::stream::{GlobalActorIdGen, fill_snapshot_backfill_epoch};
45
46pub(in crate::barrier) struct BarrierWorkerState {
48 in_flight_prev_epoch: TracedEpoch,
53
54 pending_non_checkpoint_barriers: Vec<u64>,
56
57 is_paused: bool,
59}
60
61impl BarrierWorkerState {
62 pub(super) fn new() -> Self {
63 Self {
64 in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
65 pending_non_checkpoint_barriers: vec![],
66 is_paused: false,
67 }
68 }
69
70 pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
71 Self {
72 in_flight_prev_epoch,
73 pending_non_checkpoint_barriers: vec![],
74 is_paused,
75 }
76 }
77
78 pub fn is_paused(&self) -> bool {
79 self.is_paused
80 }
81
82 fn set_is_paused(&mut self, is_paused: bool) {
83 if self.is_paused != is_paused {
84 tracing::info!(
85 currently_paused = self.is_paused,
86 newly_paused = is_paused,
87 "update paused state"
88 );
89 self.is_paused = is_paused;
90 }
91 }
92
93 pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
94 &self.in_flight_prev_epoch
95 }
96
97 pub fn next_barrier_info(
99 &mut self,
100 is_checkpoint: bool,
101 curr_epoch: TracedEpoch,
102 ) -> BarrierInfo {
103 assert!(
104 self.in_flight_prev_epoch.value() < curr_epoch.value(),
105 "curr epoch regress. {} > {}",
106 self.in_flight_prev_epoch.value(),
107 curr_epoch.value()
108 );
109 let prev_epoch = self.in_flight_prev_epoch.clone();
110 self.in_flight_prev_epoch = curr_epoch.clone();
111 self.pending_non_checkpoint_barriers
112 .push(prev_epoch.value().0);
113 let kind = if is_checkpoint {
114 let epochs = take(&mut self.pending_non_checkpoint_barriers);
115 BarrierKind::Checkpoint(epochs)
116 } else {
117 BarrierKind::Barrier
118 };
119 BarrierInfo {
120 prev_epoch,
121 curr_epoch,
122 kind,
123 }
124 }
125}
126
127pub(super) struct ApplyCommandInfo {
128 pub mv_subscription_max_retention: HashMap<TableId, u64>,
129 pub table_ids_to_commit: HashSet<TableId>,
130 pub jobs_to_wait: HashSet<JobId>,
131 pub node_to_collect: NodeToCollect,
132 pub command: PostCollectCommand,
133}
134
135impl DatabaseCheckpointControl {
136 pub(super) fn apply_command(
139 &mut self,
140 mut command: Option<Command>,
141 notifiers: &mut Vec<Notifier>,
142 barrier_info: &BarrierInfo,
143 control_stream_manager: &mut ControlStreamManager,
144 hummock_version_stats: &HummockVersionStats,
145 ) -> MetaResult<ApplyCommandInfo> {
146 let mut edges = self
147 .database_info
148 .build_edge(command.as_ref(), &*control_stream_manager);
149
150 if let &mut Some(Command::CreateStreamingJob {
152 ref mut job_type,
153 ref mut info,
154 ref cross_db_snapshot_backfill_info,
155 }) = &mut command
156 {
157 match job_type {
158 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
159 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
160 fill_snapshot_backfill_epoch(
161 &mut fragment.nodes,
162 None,
163 cross_db_snapshot_backfill_info,
164 )?;
165 }
166 }
167 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
168 assert!(!self.state.is_paused());
169 let snapshot_epoch = barrier_info.prev_epoch();
170 for snapshot_backfill_epoch in snapshot_backfill_info
172 .upstream_mv_table_id_to_backfill_epoch
173 .values_mut()
174 {
175 assert_eq!(
176 snapshot_backfill_epoch.replace(snapshot_epoch),
177 None,
178 "must not set previously"
179 );
180 }
181 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
182 fill_snapshot_backfill_epoch(
183 &mut fragment.nodes,
184 Some(snapshot_backfill_info),
185 cross_db_snapshot_backfill_info,
186 )?;
187 }
188 let job_id = info.stream_job_fragments.stream_job_id();
189 let snapshot_backfill_upstream_tables = snapshot_backfill_info
190 .upstream_mv_table_id_to_backfill_epoch
191 .keys()
192 .cloned()
193 .collect();
194
195 let job = CreatingStreamingJobControl::new(
196 CreateSnapshotBackfillJobCommandInfo {
197 info: info.clone(),
198 snapshot_backfill_info: snapshot_backfill_info.clone(),
199 cross_db_snapshot_backfill_info: cross_db_snapshot_backfill_info
200 .clone(),
201 },
202 take(notifiers),
203 snapshot_backfill_upstream_tables,
204 snapshot_epoch,
205 hummock_version_stats,
206 control_stream_manager,
207 edges.as_mut().expect("should exist"),
208 )?;
209
210 self.database_info
211 .shared_actor_infos
212 .upsert(self.database_id, job.fragment_infos_with_job_id());
213
214 self.creating_streaming_job_controls.insert(job_id, job);
215 }
216 }
217 }
218
219 let post_apply_changes = if let Some(Command::CreateStreamingJob {
221 job_type: CreateStreamingJobType::SnapshotBackfill(_),
222 ..
223 }) = command
224 {
225 None
226 } else if let Some((new_job, fragment_changes)) =
227 command.as_ref().and_then(Command::fragment_changes)
228 {
229 Some(self.database_info.pre_apply(new_job, fragment_changes))
230 } else {
231 None
232 };
233
234 match &command {
235 Some(Command::CreateSubscription {
236 subscription_id,
237 upstream_mv_table_id,
238 retention_second,
239 }) => {
240 self.database_info.register_subscriber(
241 upstream_mv_table_id.as_job_id(),
242 subscription_id.as_subscriber_id(),
243 SubscriberType::Subscription(*retention_second),
244 );
245 }
246 Some(Command::CreateStreamingJob {
247 info,
248 job_type: CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
249 ..
250 }) => {
251 for upstream_mv_table_id in snapshot_backfill_info
252 .upstream_mv_table_id_to_backfill_epoch
253 .keys()
254 {
255 self.database_info.register_subscriber(
256 upstream_mv_table_id.as_job_id(),
257 info.streaming_job.id().as_subscriber_id(),
258 SubscriberType::SnapshotBackfill,
259 );
260 }
261 }
262 _ => {}
263 };
264
265 let mut table_ids_to_commit: HashSet<_> = self.database_info.existing_table_ids().collect();
266 let mut actors_to_create = command.as_ref().and_then(|command| {
267 command.actors_to_create(&self.database_info, &mut edges, control_stream_manager)
268 });
269 let mut node_actors =
270 InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
271
272 if let Some(post_apply_changes) = post_apply_changes {
273 self.database_info.post_apply(post_apply_changes);
274 }
275
276 let prev_is_paused = self.state.is_paused();
277 let curr_is_paused = match command {
278 Some(Command::Pause) => true,
279 Some(Command::Resume) => false,
280 _ => prev_is_paused,
281 };
282 self.state.set_is_paused(curr_is_paused);
283
284 let mutation = if let Some(c) = &command {
285 c.to_mutation(
286 prev_is_paused,
287 &mut edges,
288 control_stream_manager,
289 &mut self.database_info,
290 )?
291 } else {
292 None
293 };
294
295 let mut finished_snapshot_backfill_jobs = HashSet::new();
296 let mutation = match mutation {
297 Some(mutation) => Some(mutation),
298 None => {
299 let mut finished_snapshot_backfill_job_info = HashMap::new();
300 if barrier_info.kind.is_checkpoint() {
301 for (&job_id, creating_job) in &mut self.creating_streaming_job_controls {
302 if creating_job.should_merge_to_upstream() {
303 let info = creating_job
304 .start_consume_upstream(control_stream_manager, barrier_info)?;
305 finished_snapshot_backfill_job_info
306 .try_insert(job_id, info)
307 .expect("non-duplicated");
308 }
309 }
310 }
311
312 if !finished_snapshot_backfill_job_info.is_empty() {
313 let actors_to_create = actors_to_create.get_or_insert_default();
314 let mut subscriptions_to_drop = vec![];
315 let mut dispatcher_update = vec![];
316 let mut actor_splits = HashMap::new();
317 for (job_id, info) in finished_snapshot_backfill_job_info {
318 finished_snapshot_backfill_jobs.insert(job_id);
319 subscriptions_to_drop.extend(
320 info.snapshot_backfill_upstream_tables.iter().map(
321 |upstream_table_id| PbSubscriptionUpstreamInfo {
322 subscriber_id: job_id.as_subscriber_id(),
323 upstream_mv_table_id: *upstream_table_id,
324 },
325 ),
326 );
327 for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
328 assert_matches!(
329 self.database_info.unregister_subscriber(
330 upstream_mv_table_id.as_job_id(),
331 job_id.as_subscriber_id()
332 ),
333 Some(SubscriberType::SnapshotBackfill)
334 );
335 }
336
337 table_ids_to_commit.extend(
338 info.fragment_infos
339 .values()
340 .flat_map(|fragment| fragment.state_table_ids.iter())
341 .copied(),
342 );
343
344 let actor_len = info
345 .fragment_infos
346 .values()
347 .map(|fragment| fragment.actors.len() as u64)
348 .sum();
349 let id_gen = GlobalActorIdGen::new(
350 control_stream_manager.env.actor_id_generator(),
351 actor_len,
352 );
353 let mut next_local_actor_id = 0;
354 let actor_mapping: HashMap<_, _> = info
356 .fragment_infos
357 .values()
358 .flat_map(|fragment| fragment.actors.keys())
359 .map(|old_actor_id| {
360 let new_actor_id = id_gen.to_global_id(next_local_actor_id);
361 next_local_actor_id += 1;
362 (*old_actor_id, new_actor_id.as_global_id())
363 })
364 .collect();
365 let actor_mapping = &actor_mapping;
366 let new_stream_actors: HashMap<_, _> = info
367 .stream_actors
368 .into_iter()
369 .map(|(old_actor_id, mut actor)| {
370 let new_actor_id = actor_mapping[&old_actor_id];
371 actor.actor_id = new_actor_id;
372 (new_actor_id, actor)
373 })
374 .collect();
375 let new_fragment_info: HashMap<_, _> = info
376 .fragment_infos
377 .into_iter()
378 .map(|(fragment_id, mut fragment)| {
379 let actors = take(&mut fragment.actors);
380 fragment.actors = actors
381 .into_iter()
382 .map(|(old_actor_id, actor)| {
383 let new_actor_id = actor_mapping[&old_actor_id];
384 (new_actor_id, actor)
385 })
386 .collect();
387 (fragment_id, fragment)
388 })
389 .collect();
390 actor_splits.extend(
391 new_fragment_info
392 .values()
393 .flat_map(|fragment| &fragment.actors)
394 .map(|(actor_id, actor)| {
395 (
396 *actor_id,
397 ConnectorSplits {
398 splits: actor
399 .splits
400 .iter()
401 .map(ConnectorSplit::from)
402 .collect(),
403 },
404 )
405 }),
406 );
407 let partial_graph_id = to_partial_graph_id(self.database_id, None);
409 let mut edge_builder = FragmentEdgeBuilder::new(
410 info.upstream_fragment_downstreams
411 .keys()
412 .map(|upstream_fragment_id| {
413 self.database_info.fragment(*upstream_fragment_id)
414 })
415 .chain(new_fragment_info.values())
416 .map(|info| (info, partial_graph_id)),
417 control_stream_manager,
418 );
419 edge_builder.add_relations(&info.upstream_fragment_downstreams);
420 edge_builder.add_relations(&info.downstreams);
421 let mut edges = edge_builder.build();
422 let new_actors_to_create = edges.collect_actors_to_create(
423 new_fragment_info.values().map(|fragment| {
424 (
425 fragment.fragment_id,
426 &fragment.nodes,
427 fragment.actors.iter().map(|(actor_id, actor)| {
428 (&new_stream_actors[actor_id], actor.worker_id)
429 }),
430 [], )
432 }),
433 );
434 dispatcher_update.extend(
435 info.upstream_fragment_downstreams.keys().flat_map(
436 |upstream_fragment_id| {
437 let new_actor_dispatchers = edges
438 .dispatchers
439 .remove(upstream_fragment_id)
440 .expect("should exist");
441 new_actor_dispatchers.into_iter().flat_map(
442 |(upstream_actor_id, dispatchers)| {
443 dispatchers.into_iter().map(move |dispatcher| {
444 PbDispatcherUpdate {
445 actor_id: upstream_actor_id,
446 dispatcher_id: dispatcher.dispatcher_id,
447 hash_mapping: dispatcher.hash_mapping,
448 removed_downstream_actor_id: dispatcher
449 .downstream_actor_id
450 .iter()
451 .map(|new_downstream_actor_id| {
452 actor_mapping
453 .iter()
454 .find_map(
455 |(old_actor_id, new_actor_id)| {
456 (new_downstream_actor_id
457 == new_actor_id)
458 .then_some(*old_actor_id)
459 },
460 )
461 .expect("should exist")
462 })
463 .collect(),
464 added_downstream_actor_id: dispatcher
465 .downstream_actor_id,
466 }
467 })
468 },
469 )
470 },
471 ),
472 );
473 assert!(edges.is_empty(), "remaining edges: {:?}", edges);
474 for (worker_id, worker_actors) in new_actors_to_create {
475 node_actors.entry(worker_id).or_default().extend(
476 worker_actors.values().flat_map(|(_, actors, _)| {
477 actors.iter().map(|(actor, _, _)| actor.actor_id)
478 }),
479 );
480 actors_to_create
481 .entry(worker_id)
482 .or_default()
483 .extend(worker_actors);
484 }
485 self.database_info.add_existing(InflightStreamingJobInfo {
486 job_id,
487 fragment_infos: new_fragment_info,
488 subscribers: Default::default(), status: CreateStreamingJobStatus::Created,
490 cdc_table_backfill_tracker: None, });
492 }
493
494 Some(PbMutation::Update(PbUpdateMutation {
495 dispatcher_update,
496 merge_update: vec![], actor_vnode_bitmap_update: Default::default(), dropped_actors: vec![], actor_splits,
500 actor_new_dispatchers: Default::default(), actor_cdc_table_snapshot_splits: None, sink_schema_change: Default::default(), subscriptions_to_drop,
504 }))
505 } else {
506 let fragment_ids = self.database_info.take_pending_backfill_nodes();
507 if fragment_ids.is_empty() {
508 None
509 } else {
510 Some(PbMutation::StartFragmentBackfill(
511 PbStartFragmentBackfillMutation { fragment_ids },
512 ))
513 }
514 }
515 }
516 };
517
518 #[expect(clippy::collapsible_if)]
519 if let Some(Command::DropSubscription {
520 subscription_id,
521 upstream_mv_table_id,
522 }) = command
523 {
524 if self
525 .database_info
526 .unregister_subscriber(
527 upstream_mv_table_id.as_job_id(),
528 subscription_id.as_subscriber_id(),
529 )
530 .is_none()
531 {
532 warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
533 }
534 }
535
536 for (job_id, creating_job) in &mut self.creating_streaming_job_controls {
537 if !finished_snapshot_backfill_jobs.contains(job_id) {
538 let throttle_mutation = if let Some(Command::Throttle { jobs, config }) = &command
539 && jobs.contains(job_id)
540 {
541 assert_eq!(
542 jobs.len(),
543 1,
544 "should not alter rate limit of snapshot backfill job with other jobs"
545 );
546 Some((
547 Mutation::Throttle(ThrottleMutation {
548 fragment_throttle: config
549 .iter()
550 .map(|(fragment_id, config)| (*fragment_id, *config))
551 .collect(),
552 }),
553 take(notifiers),
554 ))
555 } else {
556 None
557 };
558 creating_job.on_new_upstream_barrier(
559 control_stream_manager,
560 barrier_info,
561 throttle_mutation,
562 )?;
563 }
564 }
565
566 let node_to_collect = control_stream_manager.inject_barrier(
567 self.database_id,
568 None,
569 mutation,
570 barrier_info,
571 &node_actors,
572 InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
573 InflightFragmentInfo::workers(self.database_info.fragment_infos()),
574 actors_to_create,
575 )?;
576
577 Ok(ApplyCommandInfo {
578 mv_subscription_max_retention: self.database_info.max_subscription_retention(),
579 table_ids_to_commit,
580 jobs_to_wait: finished_snapshot_backfill_jobs,
581 node_to_collect,
582 command: command
583 .map(Command::into_post_collect)
584 .unwrap_or(PostCollectCommand::barrier()),
585 })
586 }
587}