1use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17use std::mem::take;
18
19use risingwave_common::bail;
20use risingwave_common::catalog::TableId;
21use risingwave_common::id::JobId;
22use risingwave_common::util::epoch::Epoch;
23use risingwave_pb::hummock::HummockVersionStats;
24use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
25use risingwave_pb::stream_plan::barrier_mutation::{Mutation, PbMutation};
26use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
27use risingwave_pb::stream_plan::{
28 PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation, ThrottleMutation,
29};
30use tracing::warn;
31
32use crate::MetaResult;
33use crate::barrier::checkpoint::{CreatingStreamingJobControl, DatabaseCheckpointControl};
34use crate::barrier::command::PostCollectCommand;
35use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
36use crate::barrier::edge_builder::FragmentEdgeBuilder;
37use crate::barrier::info::{
38 BarrierInfo, CreateStreamingJobStatus, InflightStreamingJobInfo, SubscriberType,
39};
40use crate::barrier::notifier::Notifier;
41use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
42use crate::barrier::utils::NodeToCollect;
43use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
44use crate::controller::fragment::InflightFragmentInfo;
45use crate::stream::{GlobalActorIdGen, fill_snapshot_backfill_epoch};
46
47pub(in crate::barrier) struct BarrierWorkerState {
49 in_flight_prev_epoch: TracedEpoch,
54
55 pending_non_checkpoint_barriers: Vec<u64>,
57
58 is_paused: bool,
60}
61
62impl BarrierWorkerState {
63 pub(super) fn new() -> Self {
64 Self {
65 in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
66 pending_non_checkpoint_barriers: vec![],
67 is_paused: false,
68 }
69 }
70
71 pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
72 Self {
73 in_flight_prev_epoch,
74 pending_non_checkpoint_barriers: vec![],
75 is_paused,
76 }
77 }
78
79 pub fn is_paused(&self) -> bool {
80 self.is_paused
81 }
82
83 fn set_is_paused(&mut self, is_paused: bool) {
84 if self.is_paused != is_paused {
85 tracing::info!(
86 currently_paused = self.is_paused,
87 newly_paused = is_paused,
88 "update paused state"
89 );
90 self.is_paused = is_paused;
91 }
92 }
93
94 pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
95 &self.in_flight_prev_epoch
96 }
97
98 pub fn next_barrier_info(
100 &mut self,
101 is_checkpoint: bool,
102 curr_epoch: TracedEpoch,
103 ) -> BarrierInfo {
104 assert!(
105 self.in_flight_prev_epoch.value() < curr_epoch.value(),
106 "curr epoch regress. {} > {}",
107 self.in_flight_prev_epoch.value(),
108 curr_epoch.value()
109 );
110 let prev_epoch = self.in_flight_prev_epoch.clone();
111 self.in_flight_prev_epoch = curr_epoch.clone();
112 self.pending_non_checkpoint_barriers
113 .push(prev_epoch.value().0);
114 let kind = if is_checkpoint {
115 let epochs = take(&mut self.pending_non_checkpoint_barriers);
116 BarrierKind::Checkpoint(epochs)
117 } else {
118 BarrierKind::Barrier
119 };
120 BarrierInfo {
121 prev_epoch,
122 curr_epoch,
123 kind,
124 }
125 }
126}
127
128pub(super) struct ApplyCommandInfo {
129 pub mv_subscription_max_retention: HashMap<TableId, u64>,
130 pub table_ids_to_commit: HashSet<TableId>,
131 pub jobs_to_wait: HashSet<JobId>,
132 pub node_to_collect: NodeToCollect,
133 pub command: PostCollectCommand,
134}
135
136impl DatabaseCheckpointControl {
137 pub(super) fn apply_command(
140 &mut self,
141 mut command: Option<Command>,
142 notifiers: &mut Vec<Notifier>,
143 barrier_info: &BarrierInfo,
144 control_stream_manager: &mut ControlStreamManager,
145 hummock_version_stats: &HummockVersionStats,
146 ) -> MetaResult<ApplyCommandInfo> {
147 debug_assert!(
148 !matches!(
149 command,
150 Some(Command::RescheduleIntent {
151 reschedule_plan: None,
152 ..
153 })
154 ),
155 "reschedule intent must be resolved before apply"
156 );
157 if matches!(
158 command,
159 Some(Command::RescheduleIntent {
160 reschedule_plan: None,
161 ..
162 })
163 ) {
164 bail!("reschedule intent must be resolved before apply");
165 }
166 let mut edges = self
167 .database_info
168 .build_edge(command.as_ref(), &*control_stream_manager);
169
170 if let &mut Some(Command::CreateStreamingJob {
172 ref mut job_type,
173 ref mut info,
174 ref cross_db_snapshot_backfill_info,
175 }) = &mut command
176 {
177 match job_type {
178 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
179 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
180 fill_snapshot_backfill_epoch(
181 &mut fragment.nodes,
182 None,
183 cross_db_snapshot_backfill_info,
184 )?;
185 }
186 }
187 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
188 assert!(!self.state.is_paused());
189 let snapshot_epoch = barrier_info.prev_epoch();
190 for snapshot_backfill_epoch in snapshot_backfill_info
192 .upstream_mv_table_id_to_backfill_epoch
193 .values_mut()
194 {
195 assert_eq!(
196 snapshot_backfill_epoch.replace(snapshot_epoch),
197 None,
198 "must not set previously"
199 );
200 }
201 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
202 fill_snapshot_backfill_epoch(
203 &mut fragment.nodes,
204 Some(snapshot_backfill_info),
205 cross_db_snapshot_backfill_info,
206 )?;
207 }
208 let job_id = info.stream_job_fragments.stream_job_id();
209 let snapshot_backfill_upstream_tables = snapshot_backfill_info
210 .upstream_mv_table_id_to_backfill_epoch
211 .keys()
212 .cloned()
213 .collect();
214
215 let job = CreatingStreamingJobControl::new(
216 CreateSnapshotBackfillJobCommandInfo {
217 info: info.clone(),
218 snapshot_backfill_info: snapshot_backfill_info.clone(),
219 cross_db_snapshot_backfill_info: cross_db_snapshot_backfill_info
220 .clone(),
221 },
222 take(notifiers),
223 snapshot_backfill_upstream_tables,
224 snapshot_epoch,
225 hummock_version_stats,
226 control_stream_manager,
227 edges.as_mut().expect("should exist"),
228 )?;
229
230 self.database_info
231 .shared_actor_infos
232 .upsert(self.database_id, job.fragment_infos_with_job_id());
233
234 self.creating_streaming_job_controls.insert(job_id, job);
235 }
236 }
237 }
238
239 let post_apply_changes = if let Some(Command::CreateStreamingJob {
241 job_type: CreateStreamingJobType::SnapshotBackfill(_),
242 ..
243 }) = command
244 {
245 None
246 } else if let Some((new_job, fragment_changes)) =
247 command.as_ref().and_then(Command::fragment_changes)
248 {
249 Some(self.database_info.pre_apply(new_job, fragment_changes))
250 } else {
251 None
252 };
253
254 match &command {
255 Some(Command::CreateSubscription {
256 subscription_id,
257 upstream_mv_table_id,
258 retention_second,
259 }) => {
260 self.database_info.register_subscriber(
261 upstream_mv_table_id.as_job_id(),
262 subscription_id.as_subscriber_id(),
263 SubscriberType::Subscription(*retention_second),
264 );
265 }
266 Some(Command::CreateStreamingJob {
267 info,
268 job_type: CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
269 ..
270 }) => {
271 for upstream_mv_table_id in snapshot_backfill_info
272 .upstream_mv_table_id_to_backfill_epoch
273 .keys()
274 {
275 self.database_info.register_subscriber(
276 upstream_mv_table_id.as_job_id(),
277 info.streaming_job.id().as_subscriber_id(),
278 SubscriberType::SnapshotBackfill,
279 );
280 }
281 }
282 _ => {}
283 };
284
285 let mut table_ids_to_commit: HashSet<_> = self.database_info.existing_table_ids().collect();
286 let mut actors_to_create = command.as_ref().and_then(|command| {
287 command.actors_to_create(&self.database_info, &mut edges, control_stream_manager)
288 });
289 let mut node_actors =
290 InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
291
292 if let Some(post_apply_changes) = post_apply_changes {
293 self.database_info.post_apply(post_apply_changes);
294 }
295
296 let prev_is_paused = self.state.is_paused();
297 let curr_is_paused = match command {
298 Some(Command::Pause) => true,
299 Some(Command::Resume) => false,
300 _ => prev_is_paused,
301 };
302 self.state.set_is_paused(curr_is_paused);
303
304 let mutation = if let Some(c) = &command {
305 c.to_mutation(
306 prev_is_paused,
307 &mut edges,
308 control_stream_manager,
309 &mut self.database_info,
310 )?
311 } else {
312 None
313 };
314
315 let mut finished_snapshot_backfill_jobs = HashSet::new();
316 let mutation = match mutation {
317 Some(mutation) => Some(mutation),
318 None => {
319 let mut finished_snapshot_backfill_job_info = HashMap::new();
320 if barrier_info.kind.is_checkpoint() {
321 for (&job_id, creating_job) in &mut self.creating_streaming_job_controls {
322 if creating_job.should_merge_to_upstream() {
323 let info = creating_job
324 .start_consume_upstream(control_stream_manager, barrier_info)?;
325 finished_snapshot_backfill_job_info
326 .try_insert(job_id, info)
327 .expect("non-duplicated");
328 }
329 }
330 }
331
332 if !finished_snapshot_backfill_job_info.is_empty() {
333 let actors_to_create = actors_to_create.get_or_insert_default();
334 let mut subscriptions_to_drop = vec![];
335 let mut dispatcher_update = vec![];
336 let mut actor_splits = HashMap::new();
337 for (job_id, info) in finished_snapshot_backfill_job_info {
338 finished_snapshot_backfill_jobs.insert(job_id);
339 subscriptions_to_drop.extend(
340 info.snapshot_backfill_upstream_tables.iter().map(
341 |upstream_table_id| PbSubscriptionUpstreamInfo {
342 subscriber_id: job_id.as_subscriber_id(),
343 upstream_mv_table_id: *upstream_table_id,
344 },
345 ),
346 );
347 for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
348 assert_matches!(
349 self.database_info.unregister_subscriber(
350 upstream_mv_table_id.as_job_id(),
351 job_id.as_subscriber_id()
352 ),
353 Some(SubscriberType::SnapshotBackfill)
354 );
355 }
356
357 table_ids_to_commit.extend(
358 info.fragment_infos
359 .values()
360 .flat_map(|fragment| fragment.state_table_ids.iter())
361 .copied(),
362 );
363
364 let actor_len = info
365 .fragment_infos
366 .values()
367 .map(|fragment| fragment.actors.len() as u64)
368 .sum();
369 let id_gen = GlobalActorIdGen::new(
370 control_stream_manager.env.actor_id_generator(),
371 actor_len,
372 );
373 let mut next_local_actor_id = 0;
374 let actor_mapping: HashMap<_, _> = info
376 .fragment_infos
377 .values()
378 .flat_map(|fragment| fragment.actors.keys())
379 .map(|old_actor_id| {
380 let new_actor_id = id_gen.to_global_id(next_local_actor_id);
381 next_local_actor_id += 1;
382 (*old_actor_id, new_actor_id.as_global_id())
383 })
384 .collect();
385 let actor_mapping = &actor_mapping;
386 let new_stream_actors: HashMap<_, _> = info
387 .stream_actors
388 .into_iter()
389 .map(|(old_actor_id, mut actor)| {
390 let new_actor_id = actor_mapping[&old_actor_id];
391 actor.actor_id = new_actor_id;
392 (new_actor_id, actor)
393 })
394 .collect();
395 let new_fragment_info: HashMap<_, _> = info
396 .fragment_infos
397 .into_iter()
398 .map(|(fragment_id, mut fragment)| {
399 let actors = take(&mut fragment.actors);
400 fragment.actors = actors
401 .into_iter()
402 .map(|(old_actor_id, actor)| {
403 let new_actor_id = actor_mapping[&old_actor_id];
404 (new_actor_id, actor)
405 })
406 .collect();
407 (fragment_id, fragment)
408 })
409 .collect();
410 actor_splits.extend(
411 new_fragment_info
412 .values()
413 .flat_map(|fragment| &fragment.actors)
414 .map(|(actor_id, actor)| {
415 (
416 *actor_id,
417 ConnectorSplits {
418 splits: actor
419 .splits
420 .iter()
421 .map(ConnectorSplit::from)
422 .collect(),
423 },
424 )
425 }),
426 );
427 let partial_graph_id = to_partial_graph_id(self.database_id, None);
429 let mut edge_builder = FragmentEdgeBuilder::new(
430 info.upstream_fragment_downstreams
431 .keys()
432 .map(|upstream_fragment_id| {
433 self.database_info.fragment(*upstream_fragment_id)
434 })
435 .chain(new_fragment_info.values())
436 .map(|info| (info, partial_graph_id)),
437 control_stream_manager,
438 );
439 edge_builder.add_relations(&info.upstream_fragment_downstreams);
440 edge_builder.add_relations(&info.downstreams);
441 let mut edges = edge_builder.build();
442 let new_actors_to_create = edges.collect_actors_to_create(
443 new_fragment_info.values().map(|fragment| {
444 (
445 fragment.fragment_id,
446 &fragment.nodes,
447 fragment.actors.iter().map(|(actor_id, actor)| {
448 (&new_stream_actors[actor_id], actor.worker_id)
449 }),
450 [], )
452 }),
453 );
454 dispatcher_update.extend(
455 info.upstream_fragment_downstreams.keys().flat_map(
456 |upstream_fragment_id| {
457 let new_actor_dispatchers = edges
458 .dispatchers
459 .remove(upstream_fragment_id)
460 .expect("should exist");
461 new_actor_dispatchers.into_iter().flat_map(
462 |(upstream_actor_id, dispatchers)| {
463 dispatchers.into_iter().map(move |dispatcher| {
464 PbDispatcherUpdate {
465 actor_id: upstream_actor_id,
466 dispatcher_id: dispatcher.dispatcher_id,
467 hash_mapping: dispatcher.hash_mapping,
468 removed_downstream_actor_id: dispatcher
469 .downstream_actor_id
470 .iter()
471 .map(|new_downstream_actor_id| {
472 actor_mapping
473 .iter()
474 .find_map(
475 |(old_actor_id, new_actor_id)| {
476 (new_downstream_actor_id
477 == new_actor_id)
478 .then_some(*old_actor_id)
479 },
480 )
481 .expect("should exist")
482 })
483 .collect(),
484 added_downstream_actor_id: dispatcher
485 .downstream_actor_id,
486 }
487 })
488 },
489 )
490 },
491 ),
492 );
493 assert!(edges.is_empty(), "remaining edges: {:?}", edges);
494 for (worker_id, worker_actors) in new_actors_to_create {
495 node_actors.entry(worker_id).or_default().extend(
496 worker_actors.values().flat_map(|(_, actors, _)| {
497 actors.iter().map(|(actor, _, _)| actor.actor_id)
498 }),
499 );
500 actors_to_create
501 .entry(worker_id)
502 .or_default()
503 .extend(worker_actors);
504 }
505 self.database_info.add_existing(InflightStreamingJobInfo {
506 job_id,
507 fragment_infos: new_fragment_info,
508 subscribers: Default::default(), status: CreateStreamingJobStatus::Created,
510 cdc_table_backfill_tracker: None, });
512 }
513
514 Some(PbMutation::Update(PbUpdateMutation {
515 dispatcher_update,
516 merge_update: vec![], actor_vnode_bitmap_update: Default::default(), dropped_actors: vec![], actor_splits,
520 actor_new_dispatchers: Default::default(), actor_cdc_table_snapshot_splits: None, sink_schema_change: Default::default(), subscriptions_to_drop,
524 }))
525 } else {
526 let fragment_ids = self.database_info.take_pending_backfill_nodes();
527 if fragment_ids.is_empty() {
528 None
529 } else {
530 Some(PbMutation::StartFragmentBackfill(
531 PbStartFragmentBackfillMutation { fragment_ids },
532 ))
533 }
534 }
535 }
536 };
537
538 #[expect(clippy::collapsible_if)]
539 if let Some(Command::DropSubscription {
540 subscription_id,
541 upstream_mv_table_id,
542 }) = command
543 {
544 if self
545 .database_info
546 .unregister_subscriber(
547 upstream_mv_table_id.as_job_id(),
548 subscription_id.as_subscriber_id(),
549 )
550 .is_none()
551 {
552 warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
553 }
554 }
555
556 for (job_id, creating_job) in &mut self.creating_streaming_job_controls {
557 if !finished_snapshot_backfill_jobs.contains(job_id) {
558 let throttle_mutation = if let Some(Command::Throttle { jobs, config }) = &command
559 && jobs.contains(job_id)
560 {
561 assert_eq!(
562 jobs.len(),
563 1,
564 "should not alter rate limit of snapshot backfill job with other jobs"
565 );
566 Some((
567 Mutation::Throttle(ThrottleMutation {
568 fragment_throttle: config
569 .iter()
570 .map(|(fragment_id, config)| (*fragment_id, *config))
571 .collect(),
572 }),
573 take(notifiers),
574 ))
575 } else {
576 None
577 };
578 creating_job.on_new_upstream_barrier(
579 control_stream_manager,
580 barrier_info,
581 throttle_mutation,
582 )?;
583 }
584 }
585
586 let node_to_collect = control_stream_manager.inject_barrier(
587 self.database_id,
588 None,
589 mutation,
590 barrier_info,
591 &node_actors,
592 InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
593 InflightFragmentInfo::workers(self.database_info.fragment_infos()),
594 actors_to_create,
595 )?;
596
597 Ok(ApplyCommandInfo {
598 mv_subscription_max_retention: self.database_info.max_subscription_retention(),
599 table_ids_to_commit,
600 jobs_to_wait: finished_snapshot_backfill_jobs,
601 node_to_collect,
602 command: command
603 .map(Command::into_post_collect)
604 .unwrap_or(PostCollectCommand::barrier()),
605 })
606 }
607}