1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::ops::Bound::{Excluded, Unbounded};
19use std::sync::atomic::AtomicU32;
20use std::task::Poll;
21
22use anyhow::anyhow;
23use fail::fail_point;
24use itertools::Itertools;
25use risingwave_common::catalog::{DatabaseId, TableId};
26use risingwave_common::id::JobId;
27use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
28use risingwave_common::util::epoch::EpochPair;
29use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
30use risingwave_meta_model::WorkerId;
31use risingwave_pb::common::WorkerNode;
32use risingwave_pb::hummock::HummockVersionStats;
33use risingwave_pb::id::{FragmentId, PartialGraphId};
34use risingwave_pb::stream_plan::DispatcherType as PbDispatcherType;
35use risingwave_pb::stream_plan::stream_node::NodeBody;
36use risingwave_pb::stream_service::BarrierCompleteResponse;
37use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
38use tracing::{debug, warn};
39
40use crate::barrier::cdc_progress::CdcProgress;
41use crate::barrier::checkpoint::independent_job::{
42 BatchRefreshJobTriggerContext, IndependentCheckpointJobControl,
43};
44use crate::barrier::checkpoint::recovery::{
45 DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
46 RecoveringStateAction,
47};
48use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
49use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
50use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
51use crate::barrier::notifier::Notifier;
52use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager, PartialGraphStat};
53use crate::barrier::progress::TrackingJob;
54use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
55use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
56use crate::barrier::utils::{BarrierItemCollector, collect_independent_job_commit_epoch_info};
57use crate::barrier::{
58 BackfillProgress, Command, CreateStreamingJobType, FragmentBackfillProgress, Reschedule,
59};
60use crate::controller::fragment::InflightFragmentInfo;
61use crate::controller::scale::{build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs};
62use crate::manager::MetaSrvEnv;
63
64fn fragment_has_online_unreschedulable_scan(fragment: &InflightFragmentInfo) -> bool {
65 let mut has_unreschedulable_scan = false;
66 visit_stream_node_cont(&fragment.nodes, |node| {
67 if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
68 let scan_type = stream_scan.stream_scan_type();
69 if !scan_type.is_reschedulable(true) {
70 has_unreschedulable_scan = true;
71 return false;
72 }
73 }
74 true
75 });
76 has_unreschedulable_scan
77}
78
79fn collect_fragment_upstream_fragment_ids(
80 fragment: &InflightFragmentInfo,
81 upstream_fragment_ids: &mut HashSet<FragmentId>,
82) {
83 visit_stream_node_cont(&fragment.nodes, |node| {
84 if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref() {
85 upstream_fragment_ids.insert(merge.upstream_fragment_id);
86 }
87 true
88 });
89}
90
91use crate::model::ActorId;
92use crate::rpc::metrics::GLOBAL_META_METRICS;
93use crate::{MetaError, MetaResult};
94
95pub(crate) struct CheckpointControl {
96 pub(crate) env: MetaSrvEnv,
97 pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
98 pub(super) hummock_version_stats: HummockVersionStats,
99 pub(crate) in_flight_barrier_nums: usize,
101}
102
103impl CheckpointControl {
104 pub fn new(env: MetaSrvEnv) -> Self {
105 Self {
106 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
107 env,
108 databases: Default::default(),
109 hummock_version_stats: Default::default(),
110 }
111 }
112
113 pub(crate) fn recover(
114 databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
115 failed_databases: HashMap<DatabaseId, HashSet<PartialGraphId>>, hummock_version_stats: HummockVersionStats,
117 env: MetaSrvEnv,
118 ) -> Self {
119 env.shared_actor_infos()
120 .retain_databases(databases.keys().chain(failed_databases.keys()).cloned());
121 Self {
122 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
123 env,
124 databases: databases
125 .into_iter()
126 .map(|(database_id, control)| {
127 (
128 database_id,
129 DatabaseCheckpointControlStatus::Running(control),
130 )
131 })
132 .chain(failed_databases.into_iter().map(
133 |(database_id, resetting_partial_graphs)| {
134 (
135 database_id,
136 DatabaseCheckpointControlStatus::Recovering(
137 DatabaseRecoveringState::new_resetting(
138 database_id,
139 resetting_partial_graphs,
140 ),
141 ),
142 )
143 },
144 ))
145 .collect(),
146 hummock_version_stats,
147 }
148 }
149
150 pub(crate) fn ack_completed(
151 &mut self,
152 partial_graph_manager: &mut PartialGraphManager,
153 output: BarrierCompleteOutput,
154 ) {
155 self.hummock_version_stats = output.hummock_version_stats;
156 for (database_id, (command_prev_epoch, independent_job_epochs)) in output.epochs_to_ack {
157 self.databases
158 .get_mut(&database_id)
159 .expect("should exist")
160 .expect_running("should have wait for completing command before enter recovery")
161 .ack_completed(
162 partial_graph_manager,
163 command_prev_epoch,
164 independent_job_epochs,
165 );
166 }
167 }
168
169 pub(crate) fn next_complete_barrier_task(
170 &mut self,
171 periodic_barriers: &mut PeriodicBarriers,
172 partial_graph_manager: &mut PartialGraphManager,
173 ) -> Option<CompleteBarrierTask> {
174 let mut task = None;
175 for database in self.databases.values_mut() {
176 let Some(database) = database.running_state_mut() else {
177 continue;
178 };
179 database.next_complete_barrier_task(
180 periodic_barriers,
181 partial_graph_manager,
182 &mut task,
183 &self.hummock_version_stats,
184 );
185 }
186 task
187 }
188
189 pub(crate) fn barrier_collected(
190 &mut self,
191 partial_graph_id: PartialGraphId,
192 collected_barrier: CollectedBarrier<'_>,
193 periodic_barriers: &mut PeriodicBarriers,
194 ) -> MetaResult<()> {
195 let (database_id, _) = from_partial_graph_id(partial_graph_id);
196 let database_status = self.databases.get_mut(&database_id).expect("should exist");
197 match database_status {
198 DatabaseCheckpointControlStatus::Running(database) => {
199 database.barrier_collected(partial_graph_id, collected_barrier, periodic_barriers)
200 }
201 DatabaseCheckpointControlStatus::Recovering(_) => {
202 if cfg!(debug_assertions) {
203 panic!(
204 "receive collected barrier {:?} on recovering database {} from partial graph {}",
205 collected_barrier, database_id, partial_graph_id
206 );
207 } else {
208 warn!(?collected_barrier, %partial_graph_id, "ignore collected barrier on recovering database");
209 }
210 Ok(())
211 }
212 }
213 }
214
215 pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
216 self.databases.iter().filter_map(|(database_id, database)| {
217 database.running_state().is_none().then_some(*database_id)
218 })
219 }
220
221 pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
222 self.databases.iter().filter_map(|(database_id, database)| {
223 database.running_state().is_some().then_some(*database_id)
224 })
225 }
226
227 pub(crate) fn database_info(&self, database_id: DatabaseId) -> Option<&InflightDatabaseInfo> {
228 self.databases
229 .get(&database_id)
230 .and_then(|database| database.running_state())
231 .map(|database| &database.database_info)
232 }
233
234 pub(crate) fn may_have_snapshot_backfilling_jobs(&self) -> bool {
235 self.databases
236 .values()
237 .any(|database| database.may_have_snapshot_backfilling_jobs())
238 }
239
240 pub(crate) fn handle_new_barrier(
242 &mut self,
243 new_barrier: NewBarrier,
244 partial_graph_manager: &mut PartialGraphManager,
245 worker_nodes: &HashMap<WorkerId, WorkerNode>,
246 ) -> MetaResult<()> {
247 let NewBarrier {
248 database_id,
249 command,
250 span,
251 checkpoint,
252 } = new_barrier;
253
254 if let Some((mut command, notifiers)) = command {
255 if let &mut Command::CreateStreamingJob {
256 ref mut cross_db_snapshot_backfill_info,
257 ref info,
258 ..
259 } = &mut command
260 {
261 for (table_id, snapshot_epoch) in
262 &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
263 {
264 for database in self.databases.values() {
265 if let Some(database) = database.running_state()
266 && database.database_info.contains_job(table_id.as_job_id())
267 {
268 if let Some(committed_epoch) = database.committed_epoch {
269 *snapshot_epoch = Some(committed_epoch);
270 }
271 break;
272 }
273 }
274 if snapshot_epoch.is_none() {
275 let table_id = *table_id;
276 warn!(
277 ?cross_db_snapshot_backfill_info,
278 ?table_id,
279 ?info,
280 "database of cross db upstream table not found"
281 );
282 let err: MetaError =
283 anyhow!("database of cross db upstream table {} not found", table_id)
284 .into();
285 for notifier in notifiers {
286 notifier.notify_start_failed(err.clone());
287 }
288
289 return Ok(());
290 }
291 }
292 }
293
294 let database = match self.databases.entry(database_id) {
295 Entry::Occupied(entry) => entry
296 .into_mut()
297 .expect_running("should not have command when not running"),
298 Entry::Vacant(entry) => match &command {
299 Command::CreateStreamingJob { info, job_type, .. } => {
300 let CreateStreamingJobType::Normal = job_type else {
301 if cfg!(debug_assertions) {
302 panic!(
303 "unexpected first job of type {job_type:?} with info {info:?}"
304 );
305 } else {
306 for notifier in notifiers {
307 notifier.notify_start_failed(anyhow!("unexpected job_type {job_type:?} for first job {} in database {database_id}", info.streaming_job.id()).into());
308 }
309 return Ok(());
310 }
311 };
312 let new_database = DatabaseCheckpointControl::new(
313 database_id,
314 self.env.shared_actor_infos().clone(),
315 );
316 let adder = partial_graph_manager.add_partial_graph(
317 to_partial_graph_id(database_id, None),
318 DatabaseCheckpointControlMetrics::new(database_id),
319 );
320 adder.added();
321 entry
322 .insert(DatabaseCheckpointControlStatus::Running(new_database))
323 .expect_running("just initialized as running")
324 }
325 Command::Flush
326 | Command::Pause
327 | Command::Resume
328 | Command::DropStreamingJobs { .. }
329 | Command::DropSubscription { .. } => {
330 for mut notifier in notifiers {
331 notifier.notify_started();
332 notifier.notify_collected();
333 }
334 warn!(?command, "skip command for empty database");
335 return Ok(());
336 }
337 Command::RescheduleIntent { .. }
338 | Command::ReplaceStreamJob(_)
339 | Command::SourceChangeSplit(_)
340 | Command::Throttle { .. }
341 | Command::CreateSubscription { .. }
342 | Command::AlterSubscriptionRetention { .. }
343 | Command::ConnectorPropsChange(_)
344 | Command::Refresh { .. }
345 | Command::ListFinish { .. }
346 | Command::LoadFinish { .. }
347 | Command::ResetSource { .. }
348 | Command::ResumeBackfill { .. }
349 | Command::InjectSourceOffsets { .. } => {
350 if cfg!(debug_assertions) {
351 panic!(
352 "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
353 database_id, command
354 )
355 } else {
356 warn!(%database_id, ?command, "database not exist when handling command");
357 for notifier in notifiers {
358 notifier.notify_start_failed(anyhow!("database {database_id} not exist when handling command {command:?}").into());
359 }
360 return Ok(());
361 }
362 }
363 },
364 };
365
366 database.handle_new_barrier(
367 Some((command, notifiers)),
368 checkpoint,
369 span,
370 partial_graph_manager,
371 &self.hummock_version_stats,
372 worker_nodes,
373 )
374 } else {
375 let database = match self.databases.entry(database_id) {
376 Entry::Occupied(entry) => entry.into_mut(),
377 Entry::Vacant(_) => {
378 return Ok(());
381 }
382 };
383 let Some(database) = database.running_state_mut() else {
384 return Ok(());
386 };
387 if partial_graph_manager.inflight_barrier_num(database.partial_graph_id)
388 >= self.in_flight_barrier_nums
389 {
390 return Ok(());
392 }
393 database.handle_new_barrier(
394 None,
395 checkpoint,
396 span,
397 partial_graph_manager,
398 &self.hummock_version_stats,
399 worker_nodes,
400 )
401 }
402 }
403
404 pub(crate) fn gen_backfill_progress(&self) -> HashMap<JobId, BackfillProgress> {
405 let mut progress = HashMap::new();
406 for status in self.databases.values() {
407 let Some(database_checkpoint_control) = status.running_state() else {
408 continue;
409 };
410 progress.extend(
412 database_checkpoint_control
413 .database_info
414 .gen_backfill_progress(),
415 );
416 for (job_id, job) in &database_checkpoint_control.independent_checkpoint_job_controls {
418 if let Some(p) = job.gen_backfill_progress() {
419 progress.insert(*job_id, p);
420 }
421 }
422 }
423 progress
424 }
425
426 pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
427 let mut progress = Vec::new();
428 for status in self.databases.values() {
429 let Some(database_checkpoint_control) = status.running_state() else {
430 continue;
431 };
432 progress.extend(
433 database_checkpoint_control
434 .database_info
435 .gen_fragment_backfill_progress(),
436 );
437 for job in database_checkpoint_control
438 .independent_checkpoint_job_controls
439 .values()
440 {
441 progress.extend(job.gen_fragment_backfill_progress());
442 }
443 }
444 progress
445 }
446
447 pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
448 let mut progress = HashMap::new();
449 for status in self.databases.values() {
450 let Some(database_checkpoint_control) = status.running_state() else {
451 continue;
452 };
453 progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
455 }
456 progress
457 }
458
459 pub(crate) fn databases_failed_at_worker_err(
460 &mut self,
461 worker_id: WorkerId,
462 ) -> impl Iterator<Item = DatabaseId> + '_ {
463 self.databases
464 .iter_mut()
465 .filter_map(
466 move |(database_id, database_status)| match database_status {
467 DatabaseCheckpointControlStatus::Running(control) => {
468 if !control.is_valid_after_worker_err(worker_id) {
469 Some(*database_id)
470 } else {
471 None
472 }
473 }
474 DatabaseCheckpointControlStatus::Recovering(state) => {
475 if !state.is_valid_after_worker_err(worker_id) {
476 Some(*database_id)
477 } else {
478 None
479 }
480 }
481 },
482 )
483 }
484
485 pub(crate) fn get_batch_refresh_trigger_info(
488 &self,
489 database_id: DatabaseId,
490 job_id: JobId,
491 ) -> u64 {
492 let database = self
493 .databases
494 .get(&database_id)
495 .and_then(|s| s.running_state())
496 .expect("database should be running for batch refresh trigger");
497 database.get_batch_refresh_trigger_info(job_id)
498 }
499
500 pub(crate) fn start_batch_refresh_run(
501 &mut self,
502 database_id: DatabaseId,
503 job_id: JobId,
504 context: &BatchRefreshJobTriggerContext,
505 worker_nodes: &HashMap<WorkerId, WorkerNode>,
506 actor_id_counter: &AtomicU32,
507 partial_graph_manager: &mut PartialGraphManager,
508 ) -> MetaResult<bool> {
509 let database = self
510 .databases
511 .get_mut(&database_id)
512 .and_then(|s| s.running_state_mut())
513 .expect("database should be running");
514 database.start_batch_refresh_run(
515 job_id,
516 context,
517 worker_nodes,
518 actor_id_counter,
519 partial_graph_manager,
520 )
521 }
522
523 pub(crate) fn apply_batch_refresh_fragment_infos(
524 &mut self,
525 database_id: DatabaseId,
526 job_id: JobId,
527 ) {
528 let database = self
529 .databases
530 .get_mut(&database_id)
531 .and_then(|s| s.running_state_mut())
532 .expect("database should be running");
533 let br_job = match database
534 .independent_checkpoint_job_controls
535 .get(&job_id)
536 .expect("job should exist")
537 {
538 IndependentCheckpointJobControl::BatchRefresh(job) => job,
539 _ => panic!("expected batch refresh job"),
540 };
541 if let Some(fragment_infos) = br_job.fragment_infos() {
542 database
543 .database_info
544 .shared_actor_infos
545 .upsert(database_id, fragment_infos.values().map(|f| (f, job_id)));
546 }
547 }
548}
549
550pub(crate) enum CheckpointControlEvent<'a> {
551 EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
552 EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
553 BatchRefreshTrigger {
556 database_id: DatabaseId,
557 job_id: JobId,
558 },
559}
560
561impl CheckpointControl {
562 pub(crate) fn on_partial_graph_reset(
563 &mut self,
564 partial_graph_id: PartialGraphId,
565 reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
566 ) {
567 let (database_id, independent_job_id) = from_partial_graph_id(partial_graph_id);
568 match self.databases.get_mut(&database_id).expect("should exist") {
569 DatabaseCheckpointControlStatus::Running(database) => {
570 if let Some(independent_job_id) = independent_job_id {
571 match database
572 .independent_checkpoint_job_controls
573 .remove(&independent_job_id)
574 {
575 Some(independent_job) => {
576 independent_job.on_partial_graph_reset();
577 }
578 None => {
579 if cfg!(debug_assertions) {
580 panic!(
581 "receive reset partial graph resp on non-existing independent job {independent_job_id} in database {database_id}"
582 )
583 }
584 warn!(
585 %database_id,
586 %independent_job_id,
587 "ignore reset partial graph resp on non-existing independent job on running database"
588 );
589 }
590 }
591 } else {
592 unreachable!("should not receive reset database resp when database running")
593 }
594 }
595 DatabaseCheckpointControlStatus::Recovering(state) => {
596 state.on_partial_graph_reset(partial_graph_id, reset_resps);
597 }
598 }
599 }
600
601 pub(crate) fn on_partial_graph_initialized(
602 &mut self,
603 partial_graph_id: PartialGraphId,
604 partial_graph_manager: &mut PartialGraphManager,
605 ) -> MetaResult<()> {
606 let (database_id, independent_job_id) = from_partial_graph_id(partial_graph_id);
607 match self.databases.get_mut(&database_id).expect("should exist") {
608 DatabaseCheckpointControlStatus::Running(database) => {
609 let Some(independent_job_id) = independent_job_id else {
610 unreachable!("database partial graph should not initialize when running")
611 };
612 let job = database
613 .independent_checkpoint_job_controls
614 .get_mut(&independent_job_id)
615 .expect("independent job should exist");
616 match job {
617 IndependentCheckpointJobControl::BatchRefresh(job) => {
618 job.on_log_store_initialized(partial_graph_manager)
619 }
620 IndependentCheckpointJobControl::CreatingStreamingJob(_) => {
621 unreachable!("creating streaming job should not initialize when running")
622 }
623 }
624 }
625 DatabaseCheckpointControlStatus::Recovering(state) => {
626 state.partial_graph_initialized(partial_graph_id);
627 Ok(())
628 }
629 }
630 }
631
632 pub(crate) fn next_event(
633 &mut self,
634 ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
635 let mut this = Some(self);
636 poll_fn(move |cx| {
637 let Some(this_mut) = this.as_mut() else {
638 unreachable!("should not be polled after poll ready")
639 };
640 for (&database_id, database_status) in &mut this_mut.databases {
641 match database_status {
642 DatabaseCheckpointControlStatus::Running(database) => {
643 if let Some(committed_epoch) = database.committed_epoch {
645 for (job_id, job) in &database.independent_checkpoint_job_controls {
646 if let IndependentCheckpointJobControl::BatchRefresh(br_job) = job
647 && br_job.should_start_refresh(committed_epoch)
648 {
649 let job_id = *job_id;
650 let _ = this.take().expect("checked Some");
651 return Poll::Ready(
652 CheckpointControlEvent::BatchRefreshTrigger {
653 database_id,
654 job_id,
655 },
656 );
657 }
658 }
659 }
660 }
661 DatabaseCheckpointControlStatus::Recovering(state) => {
662 let poll_result = state.poll_next_event(cx);
663 if let Poll::Ready(action) = poll_result {
664 let this = this.take().expect("checked Some");
665 return Poll::Ready(match action {
666 RecoveringStateAction::EnterInitializing(reset_workers) => {
667 CheckpointControlEvent::EnteringInitializing(
668 this.new_database_status_action(
669 database_id,
670 EnterInitializing(reset_workers),
671 ),
672 )
673 }
674 RecoveringStateAction::EnterRunning => {
675 CheckpointControlEvent::EnteringRunning(
676 this.new_database_status_action(database_id, EnterRunning),
677 )
678 }
679 });
680 }
681 }
682 }
683 }
684 Poll::Pending
685 })
686 }
687}
688
689pub(crate) enum DatabaseCheckpointControlStatus {
690 Running(DatabaseCheckpointControl),
691 Recovering(DatabaseRecoveringState),
692}
693
694impl DatabaseCheckpointControlStatus {
695 fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
696 match self {
697 DatabaseCheckpointControlStatus::Running(state) => Some(state),
698 DatabaseCheckpointControlStatus::Recovering(_) => None,
699 }
700 }
701
702 fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
703 match self {
704 DatabaseCheckpointControlStatus::Running(state) => Some(state),
705 DatabaseCheckpointControlStatus::Recovering(_) => None,
706 }
707 }
708
709 fn may_have_snapshot_backfilling_jobs(&self) -> bool {
710 self.running_state()
711 .map(|database| {
712 database
713 .independent_checkpoint_job_controls
714 .values()
715 .any(|job| job.is_snapshot_backfilling())
716 })
717 .unwrap_or(true) }
719
720 fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
721 match self {
722 DatabaseCheckpointControlStatus::Running(state) => state,
723 DatabaseCheckpointControlStatus::Recovering(_) => {
724 panic!("should be at running: {}", reason)
725 }
726 }
727 }
728}
729
730pub(in crate::barrier) struct DatabaseCheckpointControlMetrics {
731 barrier_latency: LabelGuardedHistogram,
732 in_flight_barrier_nums: LabelGuardedIntGauge,
733 all_barrier_nums: LabelGuardedIntGauge,
734}
735
736impl DatabaseCheckpointControlMetrics {
737 pub(in crate::barrier) fn new(database_id: DatabaseId) -> Self {
738 let database_id_str = database_id.to_string();
739 let barrier_latency = GLOBAL_META_METRICS
740 .barrier_latency
741 .with_guarded_label_values(&[&database_id_str]);
742 let in_flight_barrier_nums = GLOBAL_META_METRICS
743 .in_flight_barrier_nums
744 .with_guarded_label_values(&[&database_id_str]);
745 let all_barrier_nums = GLOBAL_META_METRICS
746 .all_barrier_nums
747 .with_guarded_label_values(&[&database_id_str]);
748 Self {
749 barrier_latency,
750 in_flight_barrier_nums,
751 all_barrier_nums,
752 }
753 }
754}
755
756impl PartialGraphStat for DatabaseCheckpointControlMetrics {
757 fn observe_barrier_latency(&self, _epoch: EpochPair, barrier_latency_secs: f64) {
758 self.barrier_latency.observe(barrier_latency_secs);
759 }
760
761 fn observe_barrier_num(&self, inflight_barrier_num: usize, collected_barrier_num: usize) {
762 self.in_flight_barrier_nums.set(inflight_barrier_num as _);
763 self.all_barrier_nums
764 .set((inflight_barrier_num + collected_barrier_num) as _);
765 }
766}
767
768pub(in crate::barrier) struct DatabaseCheckpointControl {
770 pub(super) database_id: DatabaseId,
771 partial_graph_id: PartialGraphId,
772 pub(super) state: BarrierWorkerState,
773
774 finishing_jobs_collector:
775 BarrierItemCollector<JobId, (Vec<BarrierCompleteResponse>, TrackingJob), ()>,
776 completing_barrier: Option<u64>,
779
780 committed_epoch: Option<u64>,
781
782 pub(super) database_info: InflightDatabaseInfo,
783 pub independent_checkpoint_job_controls: HashMap<JobId, IndependentCheckpointJobControl>,
784}
785
786impl DatabaseCheckpointControl {
787 fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
788 Self {
789 database_id,
790 partial_graph_id: to_partial_graph_id(database_id, None),
791 state: BarrierWorkerState::new(),
792 finishing_jobs_collector: BarrierItemCollector::new(),
793 completing_barrier: None,
794 committed_epoch: None,
795 database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
796 independent_checkpoint_job_controls: Default::default(),
797 }
798 }
799
800 pub(crate) fn recovery(
801 database_id: DatabaseId,
802 state: BarrierWorkerState,
803 committed_epoch: u64,
804 database_info: InflightDatabaseInfo,
805 independent_checkpoint_job_controls: HashMap<JobId, IndependentCheckpointJobControl>,
806 ) -> Self {
807 Self {
808 database_id,
809 partial_graph_id: to_partial_graph_id(database_id, None),
810 state,
811 finishing_jobs_collector: BarrierItemCollector::new(),
812 completing_barrier: None,
813 committed_epoch: Some(committed_epoch),
814 database_info,
815 independent_checkpoint_job_controls,
816 }
817 }
818
819 pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
820 !self.database_info.contains_worker(worker_id as _)
821 && self
822 .independent_checkpoint_job_controls
823 .values()
824 .all(|job| {
825 job.fragment_infos()
826 .map(|fragment_infos| {
827 !InflightFragmentInfo::contains_worker(
828 fragment_infos.values(),
829 worker_id,
830 )
831 })
832 .unwrap_or(true)
833 })
834 }
835
836 fn enqueue_command(&mut self, epoch: EpochPair, independent_jobs_to_wait: HashSet<JobId>) {
838 let prev_epoch = epoch.prev;
839 tracing::trace!(prev_epoch, ?independent_jobs_to_wait, "enqueue command");
840 if !independent_jobs_to_wait.is_empty() {
841 self.finishing_jobs_collector
842 .enqueue(epoch, independent_jobs_to_wait, ());
843 }
844 }
845
846 fn barrier_collected(
849 &mut self,
850 partial_graph_id: PartialGraphId,
851 collected_barrier: CollectedBarrier<'_>,
852 periodic_barriers: &mut PeriodicBarriers,
853 ) -> MetaResult<()> {
854 let prev_epoch = collected_barrier.epoch.prev;
855 tracing::trace!(
856 prev_epoch,
857 partial_graph_id = %partial_graph_id,
858 "barrier collected"
859 );
860 let (database_id, independent_job_id) = from_partial_graph_id(partial_graph_id);
861 assert_eq!(self.database_id, database_id);
862 if let Some(independent_job_id) = independent_job_id {
863 let job = self
864 .independent_checkpoint_job_controls
865 .get_mut(&independent_job_id)
866 .expect("should exist");
867 let should_force_checkpoint = job.collect(collected_barrier);
868 if should_force_checkpoint {
869 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
870 }
871 }
872 Ok(())
873 }
874}
875
876impl DatabaseCheckpointControl {
877 fn collect_backfill_pinned_upstream_log_epoch(
879 &self,
880 ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
881 self.independent_checkpoint_job_controls
882 .iter()
883 .map(|(job_id, job)| (*job_id, job.pinned_upstream_log_epoch()))
884 .collect()
885 }
886
887 fn collect_no_shuffle_fragment_relations_for_reschedule_check(
888 &self,
889 ) -> Vec<(FragmentId, FragmentId)> {
890 let mut no_shuffle_relations = Vec::new();
891 for fragment in self.database_info.fragment_infos() {
892 let downstream_fragment_id = fragment.fragment_id;
893 visit_stream_node_cont(&fragment.nodes, |node| {
894 if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
895 && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
896 {
897 no_shuffle_relations.push((merge.upstream_fragment_id, downstream_fragment_id));
898 }
899 true
900 });
901 }
902
903 for job in self.independent_checkpoint_job_controls.values() {
904 if let Some(fragment_infos) = job.fragment_infos() {
905 for fragment_info in fragment_infos.values() {
906 let downstream_fragment_id = fragment_info.fragment_id;
907 visit_stream_node_cont(&fragment_info.nodes, |node| {
908 if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
909 && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
910 {
911 no_shuffle_relations
912 .push((merge.upstream_fragment_id, downstream_fragment_id));
913 }
914 true
915 });
916 }
917 }
918 }
919 no_shuffle_relations
920 }
921
922 fn collect_reschedule_blocked_jobs_for_independent_jobs_inflight(
923 &self,
924 ) -> MetaResult<HashSet<JobId>> {
925 let mut initial_blocked_fragment_ids = HashSet::new();
926 for job in self.independent_checkpoint_job_controls.values() {
927 if let Some(fragment_infos) = job.fragment_infos() {
928 for fragment_info in fragment_infos.values() {
929 if fragment_has_online_unreschedulable_scan(fragment_info) {
930 initial_blocked_fragment_ids.insert(fragment_info.fragment_id);
931 collect_fragment_upstream_fragment_ids(
932 fragment_info,
933 &mut initial_blocked_fragment_ids,
934 );
935 }
936 }
937 }
938 }
939
940 let mut blocked_fragment_ids = initial_blocked_fragment_ids.clone();
941 if !initial_blocked_fragment_ids.is_empty() {
942 let no_shuffle_relations =
943 self.collect_no_shuffle_fragment_relations_for_reschedule_check();
944 let (forward_edges, backward_edges) =
945 build_no_shuffle_fragment_graph_edges(no_shuffle_relations);
946 let initial_blocked_fragment_ids: Vec<_> =
947 initial_blocked_fragment_ids.iter().copied().collect();
948 for ensemble in find_no_shuffle_graphs(
949 &initial_blocked_fragment_ids,
950 &forward_edges,
951 &backward_edges,
952 )? {
953 blocked_fragment_ids.extend(ensemble.fragments());
954 }
955 }
956
957 let mut blocked_job_ids = HashSet::new();
958 blocked_job_ids.extend(
959 blocked_fragment_ids
960 .into_iter()
961 .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id)),
962 );
963 Ok(blocked_job_ids)
964 }
965
966 fn collect_reschedule_blocked_job_ids(
967 &self,
968 reschedules: &HashMap<FragmentId, Reschedule>,
969 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
970 blocked_job_ids: &HashSet<JobId>,
971 ) -> HashSet<JobId> {
972 let mut affected_fragment_ids: HashSet<FragmentId> = reschedules.keys().copied().collect();
973 affected_fragment_ids.extend(fragment_actors.keys().copied());
974 for reschedule in reschedules.values() {
975 affected_fragment_ids.extend(reschedule.downstream_fragment_ids.iter().copied());
976 affected_fragment_ids.extend(
977 reschedule
978 .upstream_fragment_dispatcher_ids
979 .iter()
980 .map(|(fragment_id, _)| *fragment_id),
981 );
982 }
983
984 affected_fragment_ids
985 .into_iter()
986 .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id))
987 .filter(|job_id| blocked_job_ids.contains(job_id))
988 .collect()
989 }
990
991 fn next_complete_barrier_task(
992 &mut self,
993 periodic_barriers: &mut PeriodicBarriers,
994 partial_graph_manager: &mut PartialGraphManager,
995 task: &mut Option<CompleteBarrierTask>,
996 hummock_version_stats: &HummockVersionStats,
997 ) {
998 let mut independent_jobs_task = vec![];
1000 if let Some(committed_epoch) = self.committed_epoch {
1001 let mut finished_jobs = Vec::new();
1003 let min_upstream_inflight_barrier = partial_graph_manager
1004 .first_inflight_barrier(self.partial_graph_id)
1005 .map(|epoch| epoch.prev);
1006 for (job_id, job) in &mut self.independent_checkpoint_job_controls {
1007 match job {
1008 IndependentCheckpointJobControl::CreatingStreamingJob(creating_job) => {
1009 if let Some((epoch, resps, info, is_finish_epoch)) = creating_job
1010 .start_completing(
1011 partial_graph_manager,
1012 min_upstream_inflight_barrier,
1013 committed_epoch,
1014 )
1015 {
1016 let resps = resps.into_values().collect_vec();
1017 if is_finish_epoch {
1018 assert!(info.notifiers.is_empty());
1019 finished_jobs.push((*job_id, epoch, resps));
1020 continue;
1021 };
1022 independent_jobs_task.push((*job_id, epoch, resps, info));
1023 }
1024 }
1025 IndependentCheckpointJobControl::BatchRefresh(batch_refresh_job) => {
1026 if let Some((epoch, resps, info, tracking_job)) =
1027 batch_refresh_job.start_completing(partial_graph_manager)
1028 {
1029 let resps = resps.into_values().collect_vec();
1030 if let Some(tracking_job) = tracking_job {
1031 let task = task.get_or_insert_default();
1032 task.finished_jobs.push(tracking_job);
1033 }
1034 independent_jobs_task.push((*job_id, epoch, resps, info));
1035 }
1036 }
1037 }
1038 }
1039 if !finished_jobs.is_empty() {
1040 partial_graph_manager.remove_partial_graphs(
1041 finished_jobs
1042 .iter()
1043 .map(|(job_id, ..)| to_partial_graph_id(self.database_id, Some(*job_id)))
1044 .collect(),
1045 );
1046 }
1047 for (job_id, epoch, resps) in finished_jobs {
1048 debug!(epoch, %job_id, "finish creating job");
1049 let Some(IndependentCheckpointJobControl::CreatingStreamingJob(
1052 creating_streaming_job,
1053 )) = self.independent_checkpoint_job_controls.remove(&job_id)
1054 else {
1055 panic!("finished job {job_id} should be a creating streaming job");
1056 };
1057 let tracking_job = creating_streaming_job.into_tracking_job();
1058 self.finishing_jobs_collector
1059 .collect(epoch, job_id, (resps, tracking_job));
1060 }
1061 }
1062 let mut observed_non_checkpoint = false;
1063 self.finishing_jobs_collector.advance_collected();
1064 let epoch_end_bound = self
1065 .finishing_jobs_collector
1066 .first_inflight_epoch()
1067 .map_or(Unbounded, |epoch| Excluded(epoch.prev));
1068 if let Some((epoch, resps, info)) = partial_graph_manager.start_completing(
1069 self.partial_graph_id,
1070 epoch_end_bound,
1071 |_, resps, post_collect_command| {
1072 observed_non_checkpoint = true;
1073 self.handle_refresh_table_info(task, &resps);
1074 self.database_info.apply_collected_command(
1075 &post_collect_command,
1076 &resps,
1077 hummock_version_stats,
1078 );
1079 },
1080 ) {
1081 self.handle_refresh_table_info(task, &resps);
1082 self.database_info.apply_collected_command(
1083 &info.post_collect_command,
1084 &resps,
1085 hummock_version_stats,
1086 );
1087 let mut resps_to_commit = resps.into_values().collect_vec();
1088 let mut staging_commit_info = self.database_info.take_staging_commit_info();
1089 if let Some((_, finished_jobs, _)) =
1090 self.finishing_jobs_collector
1091 .take_collected_if(|collected_epoch| {
1092 assert!(epoch <= collected_epoch.prev);
1093 epoch == collected_epoch.prev
1094 })
1095 {
1096 finished_jobs
1097 .into_iter()
1098 .for_each(|(_, (resps, tracking_job))| {
1099 resps_to_commit.extend(resps);
1100 staging_commit_info.finished_jobs.push(tracking_job);
1101 });
1102 }
1103 {
1104 let task = task.get_or_insert_default();
1105 Command::collect_commit_epoch_info(
1106 &self.database_info,
1107 &info,
1108 &mut task.commit_info,
1109 resps_to_commit,
1110 self.collect_backfill_pinned_upstream_log_epoch(),
1111 );
1112 self.completing_barrier = Some(info.barrier_info.prev_epoch());
1113 task.finished_jobs.extend(staging_commit_info.finished_jobs);
1114 task.finished_cdc_table_backfill
1115 .extend(staging_commit_info.finished_cdc_table_backfill);
1116 task.epoch_infos
1117 .try_insert(self.partial_graph_id, info)
1118 .expect("non duplicate");
1119 task.commit_info
1120 .truncate_tables
1121 .extend(staging_commit_info.table_ids_to_truncate);
1122 }
1123 } else if observed_non_checkpoint
1124 && self.database_info.has_pending_finished_jobs()
1125 && !partial_graph_manager.has_pending_checkpoint_barrier(self.partial_graph_id)
1126 {
1127 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
1128 }
1129 if !independent_jobs_task.is_empty() {
1130 let task = task.get_or_insert_default();
1131 for (job_id, epoch, resps, info) in independent_jobs_task {
1132 collect_independent_job_commit_epoch_info(
1133 &mut task.commit_info,
1134 epoch,
1135 resps,
1136 &info,
1137 );
1138 task.epoch_infos
1139 .try_insert(to_partial_graph_id(self.database_id, Some(job_id)), info)
1140 .expect("non duplicate");
1141 }
1142 }
1143 }
1144
1145 fn ack_completed(
1146 &mut self,
1147 partial_graph_manager: &mut PartialGraphManager,
1148 command_prev_epoch: Option<u64>,
1149 independent_job_epochs: Vec<(JobId, u64)>,
1150 ) {
1151 {
1152 if let Some(prev_epoch) = self.completing_barrier.take() {
1153 assert_eq!(command_prev_epoch, Some(prev_epoch));
1154 self.committed_epoch = Some(prev_epoch);
1155 partial_graph_manager.ack_completed(self.partial_graph_id, prev_epoch);
1156 } else {
1157 assert_eq!(command_prev_epoch, None);
1158 };
1159 for (job_id, epoch) in independent_job_epochs {
1160 if let Some(job) = self.independent_checkpoint_job_controls.get_mut(&job_id) {
1161 job.ack_completed(partial_graph_manager, epoch);
1162 }
1163 }
1166 }
1167 }
1168
1169 fn handle_refresh_table_info(
1170 &self,
1171 task: &mut Option<CompleteBarrierTask>,
1172 resps: &HashMap<WorkerId, BarrierCompleteResponse>,
1173 ) {
1174 let list_finished_info = resps
1175 .values()
1176 .flat_map(|resp| resp.list_finished_sources.clone())
1177 .collect::<Vec<_>>();
1178 if !list_finished_info.is_empty() {
1179 let task = task.get_or_insert_default();
1180 task.list_finished_source_ids.extend(list_finished_info);
1181 }
1182
1183 let load_finished_info = resps
1184 .values()
1185 .flat_map(|resp| resp.load_finished_sources.clone())
1186 .collect::<Vec<_>>();
1187 if !load_finished_info.is_empty() {
1188 let task = task.get_or_insert_default();
1189 task.load_finished_source_ids.extend(load_finished_info);
1190 }
1191
1192 let refresh_finished_table_ids: Vec<JobId> = resps
1193 .values()
1194 .flat_map(|resp| {
1195 resp.refresh_finished_tables
1196 .iter()
1197 .map(|table_id| table_id.as_job_id())
1198 })
1199 .collect::<Vec<_>>();
1200 if !refresh_finished_table_ids.is_empty() {
1201 let task = task.get_or_insert_default();
1202 task.refresh_finished_table_job_ids
1203 .extend(refresh_finished_table_ids);
1204 }
1205 }
1206}
1207
1208impl DatabaseCheckpointControl {
1209 fn handle_new_barrier(
1211 &mut self,
1212 command: Option<(Command, Vec<Notifier>)>,
1213 checkpoint: bool,
1214 span: tracing::Span,
1215 partial_graph_manager: &mut PartialGraphManager,
1216 hummock_version_stats: &HummockVersionStats,
1217 worker_nodes: &HashMap<WorkerId, WorkerNode>,
1218 ) -> MetaResult<()> {
1219 let curr_epoch = self.state.in_flight_prev_epoch().next();
1220
1221 let (command, mut notifiers) = if let Some((command, notifiers)) = command {
1222 (Some(command), notifiers)
1223 } else {
1224 (None, vec![])
1225 };
1226
1227 debug_assert!(
1228 !matches!(
1229 &command,
1230 Some(Command::RescheduleIntent {
1231 reschedule_plan: None,
1232 ..
1233 })
1234 ),
1235 "reschedule intent should be resolved before injection"
1236 );
1237
1238 if let Some(Command::DropStreamingJobs {
1239 streaming_job_ids, ..
1240 }) = &command
1241 {
1242 if streaming_job_ids.len() > 1 {
1243 for job_to_cancel in streaming_job_ids {
1244 if self
1245 .independent_checkpoint_job_controls
1246 .contains_key(job_to_cancel)
1247 {
1248 warn!(
1249 job_id = %job_to_cancel,
1250 "ignore multi-job cancel command on creating snapshot backfill streaming job"
1251 );
1252 for notifier in notifiers {
1253 notifier
1254 .notify_start_failed(anyhow!("cannot cancel creating snapshot backfill streaming job with other jobs, \
1255 the job will continue creating until created or recovery. Please cancel the snapshot backfilling job in a single DDL ").into());
1256 }
1257 return Ok(());
1258 }
1259 }
1260 } else if let Some(job_to_drop) = streaming_job_ids.iter().next()
1261 && let Some(job) = self
1262 .independent_checkpoint_job_controls
1263 .get_mut(job_to_drop)
1264 {
1265 let dropped = job.drop(&mut notifiers, partial_graph_manager);
1266 if dropped {
1267 return Ok(());
1268 }
1269 }
1270 }
1271
1272 if let Some(Command::Throttle { jobs, .. }) = &command
1273 && jobs.len() > 1
1274 && let Some(independent_job_id) = jobs
1275 .iter()
1276 .find(|job| self.independent_checkpoint_job_controls.contains_key(*job))
1277 {
1278 warn!(
1279 job_id = %independent_job_id,
1280 "ignore multi-job throttle command on independent checkpoint job"
1281 );
1282 for notifier in notifiers {
1283 notifier.notify_start_failed(
1284 anyhow!(
1285 "cannot alter rate limit for independent checkpoint job with other jobs, \
1286 the original rate limit will be kept during recovery."
1287 )
1288 .into(),
1289 );
1290 }
1291 return Ok(());
1292 };
1293
1294 if let Some(Command::RescheduleIntent {
1295 reschedule_plan: Some(reschedule_plan),
1296 ..
1297 }) = &command
1298 && !self.independent_checkpoint_job_controls.is_empty()
1299 {
1300 let blocked_job_ids =
1301 self.collect_reschedule_blocked_jobs_for_independent_jobs_inflight()?;
1302 let blocked_reschedule_job_ids = self.collect_reschedule_blocked_job_ids(
1303 &reschedule_plan.reschedules,
1304 &reschedule_plan.fragment_actors,
1305 &blocked_job_ids,
1306 );
1307 if !blocked_reschedule_job_ids.is_empty() {
1308 warn!(
1309 blocked_reschedule_job_ids = ?blocked_reschedule_job_ids,
1310 "reject reschedule fragments related to creating unreschedulable backfill jobs"
1311 );
1312 for notifier in notifiers {
1313 notifier.notify_start_failed(
1314 anyhow!(
1315 "cannot reschedule jobs {:?} when creating jobs with unreschedulable backfill fragments",
1316 blocked_reschedule_job_ids
1317 )
1318 .into(),
1319 );
1320 }
1321 return Ok(());
1322 }
1323 }
1324
1325 if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1326 && self.database_info.is_empty()
1327 {
1328 assert!(
1329 self.independent_checkpoint_job_controls.is_empty(),
1330 "should not have snapshot backfill job when there is no normal job in database"
1331 );
1332 for mut notifier in notifiers {
1334 notifier.notify_started();
1335 notifier.notify_collected();
1336 }
1337 return Ok(());
1338 };
1339
1340 if let Some(Command::CreateStreamingJob {
1341 job_type:
1342 CreateStreamingJobType::SnapshotBackfill(_) | CreateStreamingJobType::BatchRefresh(_),
1343 ..
1344 }) = &command
1345 && self.state.is_paused()
1346 {
1347 warn!("cannot create streaming job with snapshot backfill when paused");
1348 for notifier in notifiers {
1349 notifier.notify_start_failed(
1350 anyhow!("cannot create streaming job with snapshot backfill when paused",)
1351 .into(),
1352 );
1353 }
1354 return Ok(());
1355 }
1356
1357 let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1358 barrier_info.prev_epoch.span().in_scope(|| {
1360 tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch(), "new barrier enqueued");
1361 });
1362 span.record("epoch", barrier_info.curr_epoch());
1363
1364 let epoch = barrier_info.epoch();
1365 let ApplyCommandInfo { jobs_to_wait } = match self.apply_command(
1366 command,
1367 &mut notifiers,
1368 barrier_info,
1369 partial_graph_manager,
1370 hummock_version_stats,
1371 worker_nodes,
1372 ) {
1373 Ok(info) => {
1374 assert!(notifiers.is_empty());
1375 info
1376 }
1377 Err(err) => {
1378 for notifier in notifiers {
1379 notifier.notify_start_failed(err.clone());
1380 }
1381 fail_point!("inject_barrier_err_success");
1382 return Err(err);
1383 }
1384 };
1385
1386 self.enqueue_command(epoch, jobs_to_wait);
1388
1389 Ok(())
1390 }
1391
1392 pub(crate) fn get_batch_refresh_trigger_info(&self, job_id: JobId) -> u64 {
1396 let job = self
1397 .independent_checkpoint_job_controls
1398 .get(&job_id)
1399 .expect("batch refresh job should exist");
1400 match job {
1401 IndependentCheckpointJobControl::BatchRefresh(br_job) => br_job
1402 .last_committed_epoch()
1403 .expect("idle job must have a last_committed_epoch"),
1404 _ => panic!("job {} should be a batch refresh job", job_id),
1405 }
1406 }
1407
1408 pub(crate) fn start_batch_refresh_run(
1412 &mut self,
1413 job_id: JobId,
1414 context: &BatchRefreshJobTriggerContext,
1415 worker_nodes: &HashMap<WorkerId, WorkerNode>,
1416 actor_id_counter: &AtomicU32,
1417 partial_graph_manager: &mut PartialGraphManager,
1418 ) -> MetaResult<bool> {
1419 let job = self
1420 .independent_checkpoint_job_controls
1421 .get_mut(&job_id)
1422 .expect("batch refresh job should exist");
1423 match job {
1424 IndependentCheckpointJobControl::BatchRefresh(br_job) => br_job.start_refresh_run(
1425 context,
1426 worker_nodes,
1427 actor_id_counter,
1428 partial_graph_manager,
1429 ),
1430 _ => panic!("job {} should be a batch refresh job", job_id),
1431 }
1432 }
1433}