1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::ops::Bound::{Excluded, Unbounded};
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use itertools::Itertools;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_common::system_param::AdaptiveParallelismStrategy;
28use risingwave_common::util::epoch::EpochPair;
29use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
30use risingwave_meta_model::WorkerId;
31use risingwave_pb::common::WorkerNode;
32use risingwave_pb::hummock::HummockVersionStats;
33use risingwave_pb::id::{FragmentId, PartialGraphId};
34use risingwave_pb::stream_plan::DispatcherType as PbDispatcherType;
35use risingwave_pb::stream_plan::stream_node::NodeBody;
36use risingwave_pb::stream_service::BarrierCompleteResponse;
37use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
38use tracing::{debug, warn};
39
40use crate::barrier::cdc_progress::CdcProgress;
41use crate::barrier::checkpoint::independent_job::IndependentCheckpointJobControl;
42use crate::barrier::checkpoint::recovery::{
43 DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
44 RecoveringStateAction,
45};
46use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
47use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
48use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
49use crate::barrier::notifier::Notifier;
50use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager, PartialGraphStat};
51use crate::barrier::progress::TrackingJob;
52use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
53use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
54use crate::barrier::utils::{BarrierItemCollector, collect_independent_job_commit_epoch_info};
55use crate::barrier::{
56 BackfillProgress, Command, CreateStreamingJobType, FragmentBackfillProgress, Reschedule,
57};
58use crate::controller::fragment::InflightFragmentInfo;
59use crate::controller::scale::{build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs};
60use crate::manager::MetaSrvEnv;
61
62fn fragment_has_online_unreschedulable_scan(fragment: &InflightFragmentInfo) -> bool {
63 let mut has_unreschedulable_scan = false;
64 visit_stream_node_cont(&fragment.nodes, |node| {
65 if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
66 let scan_type = stream_scan.stream_scan_type();
67 if !scan_type.is_reschedulable(true) {
68 has_unreschedulable_scan = true;
69 return false;
70 }
71 }
72 true
73 });
74 has_unreschedulable_scan
75}
76
77fn collect_fragment_upstream_fragment_ids(
78 fragment: &InflightFragmentInfo,
79 upstream_fragment_ids: &mut HashSet<FragmentId>,
80) {
81 visit_stream_node_cont(&fragment.nodes, |node| {
82 if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref() {
83 upstream_fragment_ids.insert(merge.upstream_fragment_id);
84 }
85 true
86 });
87}
88
89use crate::model::ActorId;
90use crate::rpc::metrics::GLOBAL_META_METRICS;
91use crate::{MetaError, MetaResult};
92
93pub(crate) struct CheckpointControl {
94 pub(crate) env: MetaSrvEnv,
95 pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
96 pub(super) hummock_version_stats: HummockVersionStats,
97 pub(crate) in_flight_barrier_nums: usize,
99}
100
101impl CheckpointControl {
102 pub fn new(env: MetaSrvEnv) -> Self {
103 Self {
104 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
105 env,
106 databases: Default::default(),
107 hummock_version_stats: Default::default(),
108 }
109 }
110
111 pub(crate) fn recover(
112 databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
113 failed_databases: HashMap<DatabaseId, HashSet<PartialGraphId>>, hummock_version_stats: HummockVersionStats,
115 env: MetaSrvEnv,
116 ) -> Self {
117 env.shared_actor_infos()
118 .retain_databases(databases.keys().chain(failed_databases.keys()).cloned());
119 Self {
120 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
121 env,
122 databases: databases
123 .into_iter()
124 .map(|(database_id, control)| {
125 (
126 database_id,
127 DatabaseCheckpointControlStatus::Running(control),
128 )
129 })
130 .chain(failed_databases.into_iter().map(
131 |(database_id, resetting_partial_graphs)| {
132 (
133 database_id,
134 DatabaseCheckpointControlStatus::Recovering(
135 DatabaseRecoveringState::new_resetting(
136 database_id,
137 resetting_partial_graphs,
138 ),
139 ),
140 )
141 },
142 ))
143 .collect(),
144 hummock_version_stats,
145 }
146 }
147
148 pub(crate) fn ack_completed(
149 &mut self,
150 partial_graph_manager: &mut PartialGraphManager,
151 output: BarrierCompleteOutput,
152 ) {
153 self.hummock_version_stats = output.hummock_version_stats;
154 for (database_id, (command_prev_epoch, independent_job_epochs)) in output.epochs_to_ack {
155 self.databases
156 .get_mut(&database_id)
157 .expect("should exist")
158 .expect_running("should have wait for completing command before enter recovery")
159 .ack_completed(
160 partial_graph_manager,
161 command_prev_epoch,
162 independent_job_epochs,
163 );
164 }
165 }
166
167 pub(crate) fn next_complete_barrier_task(
168 &mut self,
169 periodic_barriers: &mut PeriodicBarriers,
170 partial_graph_manager: &mut PartialGraphManager,
171 ) -> Option<CompleteBarrierTask> {
172 let mut task = None;
173 for database in self.databases.values_mut() {
174 let Some(database) = database.running_state_mut() else {
175 continue;
176 };
177 database.next_complete_barrier_task(
178 periodic_barriers,
179 partial_graph_manager,
180 &mut task,
181 &self.hummock_version_stats,
182 );
183 }
184 task
185 }
186
187 pub(crate) fn barrier_collected(
188 &mut self,
189 partial_graph_id: PartialGraphId,
190 collected_barrier: CollectedBarrier<'_>,
191 periodic_barriers: &mut PeriodicBarriers,
192 ) -> MetaResult<()> {
193 let (database_id, _) = from_partial_graph_id(partial_graph_id);
194 let database_status = self.databases.get_mut(&database_id).expect("should exist");
195 match database_status {
196 DatabaseCheckpointControlStatus::Running(database) => {
197 database.barrier_collected(partial_graph_id, collected_barrier, periodic_barriers)
198 }
199 DatabaseCheckpointControlStatus::Recovering(_) => {
200 if cfg!(debug_assertions) {
201 panic!(
202 "receive collected barrier {:?} on recovering database {} from partial graph {}",
203 collected_barrier, database_id, partial_graph_id
204 );
205 } else {
206 warn!(?collected_barrier, %partial_graph_id, "ignore collected barrier on recovering database");
207 }
208 Ok(())
209 }
210 }
211 }
212
213 pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
214 self.databases.iter().filter_map(|(database_id, database)| {
215 database.running_state().is_none().then_some(*database_id)
216 })
217 }
218
219 pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
220 self.databases.iter().filter_map(|(database_id, database)| {
221 database.running_state().is_some().then_some(*database_id)
222 })
223 }
224
225 pub(crate) fn database_info(&self, database_id: DatabaseId) -> Option<&InflightDatabaseInfo> {
226 self.databases
227 .get(&database_id)
228 .and_then(|database| database.running_state())
229 .map(|database| &database.database_info)
230 }
231
232 pub(crate) fn may_have_snapshot_backfilling_jobs(&self) -> bool {
233 self.databases
234 .values()
235 .any(|database| database.may_have_snapshot_backfilling_jobs())
236 }
237
238 pub(crate) fn handle_new_barrier(
240 &mut self,
241 new_barrier: NewBarrier,
242 partial_graph_manager: &mut PartialGraphManager,
243 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
244 worker_nodes: &HashMap<WorkerId, WorkerNode>,
245 ) -> MetaResult<()> {
246 let NewBarrier {
247 database_id,
248 command,
249 span,
250 checkpoint,
251 } = new_barrier;
252
253 if let Some((mut command, notifiers)) = command {
254 if let &mut Command::CreateStreamingJob {
255 ref mut cross_db_snapshot_backfill_info,
256 ref info,
257 ..
258 } = &mut command
259 {
260 for (table_id, snapshot_epoch) in
261 &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
262 {
263 for database in self.databases.values() {
264 if let Some(database) = database.running_state()
265 && database.database_info.contains_job(table_id.as_job_id())
266 {
267 if let Some(committed_epoch) = database.committed_epoch {
268 *snapshot_epoch = Some(committed_epoch);
269 }
270 break;
271 }
272 }
273 if snapshot_epoch.is_none() {
274 let table_id = *table_id;
275 warn!(
276 ?cross_db_snapshot_backfill_info,
277 ?table_id,
278 ?info,
279 "database of cross db upstream table not found"
280 );
281 let err: MetaError =
282 anyhow!("database of cross db upstream table {} not found", table_id)
283 .into();
284 for notifier in notifiers {
285 notifier.notify_start_failed(err.clone());
286 }
287
288 return Ok(());
289 }
290 }
291 }
292
293 let database = match self.databases.entry(database_id) {
294 Entry::Occupied(entry) => entry
295 .into_mut()
296 .expect_running("should not have command when not running"),
297 Entry::Vacant(entry) => match &command {
298 Command::CreateStreamingJob { info, job_type, .. } => {
299 let CreateStreamingJobType::Normal = job_type else {
300 if cfg!(debug_assertions) {
301 panic!(
302 "unexpected first job of type {job_type:?} with info {info:?}"
303 );
304 } else {
305 for notifier in notifiers {
306 notifier.notify_start_failed(anyhow!("unexpected job_type {job_type:?} for first job {} in database {database_id}", info.streaming_job.id()).into());
307 }
308 return Ok(());
309 }
310 };
311 let new_database = DatabaseCheckpointControl::new(
312 database_id,
313 self.env.shared_actor_infos().clone(),
314 );
315 let adder = partial_graph_manager.add_partial_graph(
316 to_partial_graph_id(database_id, None),
317 DatabaseCheckpointControlMetrics::new(database_id),
318 );
319 adder.added();
320 entry
321 .insert(DatabaseCheckpointControlStatus::Running(new_database))
322 .expect_running("just initialized as running")
323 }
324 Command::Flush
325 | Command::Pause
326 | Command::Resume
327 | Command::DropStreamingJobs { .. }
328 | Command::DropSubscription { .. } => {
329 for mut notifier in notifiers {
330 notifier.notify_started();
331 notifier.notify_collected();
332 }
333 warn!(?command, "skip command for empty database");
334 return Ok(());
335 }
336 Command::RescheduleIntent { .. }
337 | Command::ReplaceStreamJob(_)
338 | Command::SourceChangeSplit(_)
339 | Command::Throttle { .. }
340 | Command::CreateSubscription { .. }
341 | Command::AlterSubscriptionRetention { .. }
342 | Command::ConnectorPropsChange(_)
343 | Command::Refresh { .. }
344 | Command::ListFinish { .. }
345 | Command::LoadFinish { .. }
346 | Command::ResetSource { .. }
347 | Command::ResumeBackfill { .. }
348 | Command::InjectSourceOffsets { .. } => {
349 if cfg!(debug_assertions) {
350 panic!(
351 "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
352 database_id, command
353 )
354 } else {
355 warn!(%database_id, ?command, "database not exist when handling command");
356 for notifier in notifiers {
357 notifier.notify_start_failed(anyhow!("database {database_id} not exist when handling command {command:?}").into());
358 }
359 return Ok(());
360 }
361 }
362 },
363 };
364
365 database.handle_new_barrier(
366 Some((command, notifiers)),
367 checkpoint,
368 span,
369 partial_graph_manager,
370 &self.hummock_version_stats,
371 adaptive_parallelism_strategy,
372 worker_nodes,
373 )
374 } else {
375 let database = match self.databases.entry(database_id) {
376 Entry::Occupied(entry) => entry.into_mut(),
377 Entry::Vacant(_) => {
378 return Ok(());
381 }
382 };
383 let Some(database) = database.running_state_mut() else {
384 return Ok(());
386 };
387 if partial_graph_manager.inflight_barrier_num(database.partial_graph_id)
388 >= self.in_flight_barrier_nums
389 {
390 return Ok(());
392 }
393 database.handle_new_barrier(
394 None,
395 checkpoint,
396 span,
397 partial_graph_manager,
398 &self.hummock_version_stats,
399 adaptive_parallelism_strategy,
400 worker_nodes,
401 )
402 }
403 }
404
405 pub(crate) fn gen_backfill_progress(&self) -> HashMap<JobId, BackfillProgress> {
406 let mut progress = HashMap::new();
407 for status in self.databases.values() {
408 let Some(database_checkpoint_control) = status.running_state() else {
409 continue;
410 };
411 progress.extend(
413 database_checkpoint_control
414 .database_info
415 .gen_backfill_progress(),
416 );
417 for (job_id, job) in &database_checkpoint_control.independent_checkpoint_job_controls {
419 progress.extend([(*job_id, job.gen_backfill_progress())]);
420 }
421 }
422 progress
423 }
424
425 pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
426 let mut progress = Vec::new();
427 for status in self.databases.values() {
428 let Some(database_checkpoint_control) = status.running_state() else {
429 continue;
430 };
431 progress.extend(
432 database_checkpoint_control
433 .database_info
434 .gen_fragment_backfill_progress(),
435 );
436 for job in database_checkpoint_control
437 .independent_checkpoint_job_controls
438 .values()
439 {
440 progress.extend(job.gen_fragment_backfill_progress());
441 }
442 }
443 progress
444 }
445
446 pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
447 let mut progress = HashMap::new();
448 for status in self.databases.values() {
449 let Some(database_checkpoint_control) = status.running_state() else {
450 continue;
451 };
452 progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
454 }
455 progress
456 }
457
458 pub(crate) fn databases_failed_at_worker_err(
459 &mut self,
460 worker_id: WorkerId,
461 ) -> impl Iterator<Item = DatabaseId> + '_ {
462 self.databases
463 .iter_mut()
464 .filter_map(
465 move |(database_id, database_status)| match database_status {
466 DatabaseCheckpointControlStatus::Running(control) => {
467 if !control.is_valid_after_worker_err(worker_id) {
468 Some(*database_id)
469 } else {
470 None
471 }
472 }
473 DatabaseCheckpointControlStatus::Recovering(state) => {
474 if !state.is_valid_after_worker_err(worker_id) {
475 Some(*database_id)
476 } else {
477 None
478 }
479 }
480 },
481 )
482 }
483}
484
485pub(crate) enum CheckpointControlEvent<'a> {
486 EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
487 EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
488}
489
490impl CheckpointControl {
491 pub(crate) fn on_partial_graph_reset(
492 &mut self,
493 partial_graph_id: PartialGraphId,
494 reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
495 ) {
496 let (database_id, independent_job_id) = from_partial_graph_id(partial_graph_id);
497 match self.databases.get_mut(&database_id).expect("should exist") {
498 DatabaseCheckpointControlStatus::Running(database) => {
499 if let Some(independent_job_id) = independent_job_id {
500 match database
501 .independent_checkpoint_job_controls
502 .remove(&independent_job_id)
503 {
504 Some(independent_job) => {
505 independent_job.on_partial_graph_reset();
506 }
507 None => {
508 if cfg!(debug_assertions) {
509 panic!(
510 "receive reset partial graph resp on non-existing independent job {independent_job_id} in database {database_id}"
511 )
512 }
513 warn!(
514 %database_id,
515 %independent_job_id,
516 "ignore reset partial graph resp on non-existing independent job on running database"
517 );
518 }
519 }
520 } else {
521 unreachable!("should not receive reset database resp when database running")
522 }
523 }
524 DatabaseCheckpointControlStatus::Recovering(state) => {
525 state.on_partial_graph_reset(partial_graph_id, reset_resps);
526 }
527 }
528 }
529
530 pub(crate) fn on_partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
531 let (database_id, _) = from_partial_graph_id(partial_graph_id);
532 match self.databases.get_mut(&database_id).expect("should exist") {
533 DatabaseCheckpointControlStatus::Running(_) => {
534 unreachable!("should not have partial graph initialized when running")
535 }
536 DatabaseCheckpointControlStatus::Recovering(state) => {
537 state.partial_graph_initialized(partial_graph_id);
538 }
539 }
540 }
541
542 pub(crate) fn next_event(
543 &mut self,
544 ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
545 let mut this = Some(self);
546 poll_fn(move |cx| {
547 let Some(this_mut) = this.as_mut() else {
548 unreachable!("should not be polled after poll ready")
549 };
550 for (&database_id, database_status) in &mut this_mut.databases {
551 match database_status {
552 DatabaseCheckpointControlStatus::Running(_) => {}
553 DatabaseCheckpointControlStatus::Recovering(state) => {
554 let poll_result = state.poll_next_event(cx);
555 if let Poll::Ready(action) = poll_result {
556 let this = this.take().expect("checked Some");
557 return Poll::Ready(match action {
558 RecoveringStateAction::EnterInitializing(reset_workers) => {
559 CheckpointControlEvent::EnteringInitializing(
560 this.new_database_status_action(
561 database_id,
562 EnterInitializing(reset_workers),
563 ),
564 )
565 }
566 RecoveringStateAction::EnterRunning => {
567 CheckpointControlEvent::EnteringRunning(
568 this.new_database_status_action(database_id, EnterRunning),
569 )
570 }
571 });
572 }
573 }
574 }
575 }
576 Poll::Pending
577 })
578 }
579}
580
581pub(crate) enum DatabaseCheckpointControlStatus {
582 Running(DatabaseCheckpointControl),
583 Recovering(DatabaseRecoveringState),
584}
585
586impl DatabaseCheckpointControlStatus {
587 fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
588 match self {
589 DatabaseCheckpointControlStatus::Running(state) => Some(state),
590 DatabaseCheckpointControlStatus::Recovering(_) => None,
591 }
592 }
593
594 fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
595 match self {
596 DatabaseCheckpointControlStatus::Running(state) => Some(state),
597 DatabaseCheckpointControlStatus::Recovering(_) => None,
598 }
599 }
600
601 fn may_have_snapshot_backfilling_jobs(&self) -> bool {
602 self.running_state()
603 .map(|database| {
604 database
605 .independent_checkpoint_job_controls
606 .values()
607 .any(|job| job.is_snapshot_backfilling())
608 })
609 .unwrap_or(true) }
611
612 fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
613 match self {
614 DatabaseCheckpointControlStatus::Running(state) => state,
615 DatabaseCheckpointControlStatus::Recovering(_) => {
616 panic!("should be at running: {}", reason)
617 }
618 }
619 }
620}
621
622pub(in crate::barrier) struct DatabaseCheckpointControlMetrics {
623 barrier_latency: LabelGuardedHistogram,
624 in_flight_barrier_nums: LabelGuardedIntGauge,
625 all_barrier_nums: LabelGuardedIntGauge,
626}
627
628impl DatabaseCheckpointControlMetrics {
629 pub(in crate::barrier) fn new(database_id: DatabaseId) -> Self {
630 let database_id_str = database_id.to_string();
631 let barrier_latency = GLOBAL_META_METRICS
632 .barrier_latency
633 .with_guarded_label_values(&[&database_id_str]);
634 let in_flight_barrier_nums = GLOBAL_META_METRICS
635 .in_flight_barrier_nums
636 .with_guarded_label_values(&[&database_id_str]);
637 let all_barrier_nums = GLOBAL_META_METRICS
638 .all_barrier_nums
639 .with_guarded_label_values(&[&database_id_str]);
640 Self {
641 barrier_latency,
642 in_flight_barrier_nums,
643 all_barrier_nums,
644 }
645 }
646}
647
648impl PartialGraphStat for DatabaseCheckpointControlMetrics {
649 fn observe_barrier_latency(&self, _epoch: EpochPair, barrier_latency_secs: f64) {
650 self.barrier_latency.observe(barrier_latency_secs);
651 }
652
653 fn observe_barrier_num(&self, inflight_barrier_num: usize, collected_barrier_num: usize) {
654 self.in_flight_barrier_nums.set(inflight_barrier_num as _);
655 self.all_barrier_nums
656 .set((inflight_barrier_num + collected_barrier_num) as _);
657 }
658}
659
660pub(in crate::barrier) struct DatabaseCheckpointControl {
662 pub(super) database_id: DatabaseId,
663 partial_graph_id: PartialGraphId,
664 pub(super) state: BarrierWorkerState,
665
666 finishing_jobs_collector:
667 BarrierItemCollector<JobId, (Vec<BarrierCompleteResponse>, TrackingJob), ()>,
668 completing_barrier: Option<u64>,
671
672 committed_epoch: Option<u64>,
673
674 pub(super) database_info: InflightDatabaseInfo,
675 pub independent_checkpoint_job_controls: HashMap<JobId, IndependentCheckpointJobControl>,
676}
677
678impl DatabaseCheckpointControl {
679 fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
680 Self {
681 database_id,
682 partial_graph_id: to_partial_graph_id(database_id, None),
683 state: BarrierWorkerState::new(),
684 finishing_jobs_collector: BarrierItemCollector::new(),
685 completing_barrier: None,
686 committed_epoch: None,
687 database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
688 independent_checkpoint_job_controls: Default::default(),
689 }
690 }
691
692 pub(crate) fn recovery(
693 database_id: DatabaseId,
694 state: BarrierWorkerState,
695 committed_epoch: u64,
696 database_info: InflightDatabaseInfo,
697 independent_checkpoint_job_controls: HashMap<JobId, IndependentCheckpointJobControl>,
698 ) -> Self {
699 Self {
700 database_id,
701 partial_graph_id: to_partial_graph_id(database_id, None),
702 state,
703 finishing_jobs_collector: BarrierItemCollector::new(),
704 completing_barrier: None,
705 committed_epoch: Some(committed_epoch),
706 database_info,
707 independent_checkpoint_job_controls,
708 }
709 }
710
711 pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
712 !self.database_info.contains_worker(worker_id as _)
713 && self
714 .independent_checkpoint_job_controls
715 .values()
716 .all(|job| {
717 job.fragment_infos()
718 .map(|fragment_infos| {
719 !InflightFragmentInfo::contains_worker(
720 fragment_infos.values(),
721 worker_id,
722 )
723 })
724 .unwrap_or(true)
725 })
726 }
727
728 fn enqueue_command(&mut self, epoch: EpochPair, independent_jobs_to_wait: HashSet<JobId>) {
730 let prev_epoch = epoch.prev;
731 tracing::trace!(prev_epoch, ?independent_jobs_to_wait, "enqueue command");
732 if !independent_jobs_to_wait.is_empty() {
733 self.finishing_jobs_collector
734 .enqueue(epoch, independent_jobs_to_wait, ());
735 }
736 }
737
738 fn barrier_collected(
741 &mut self,
742 partial_graph_id: PartialGraphId,
743 collected_barrier: CollectedBarrier<'_>,
744 periodic_barriers: &mut PeriodicBarriers,
745 ) -> MetaResult<()> {
746 let prev_epoch = collected_barrier.epoch.prev;
747 tracing::trace!(
748 prev_epoch,
749 partial_graph_id = %partial_graph_id,
750 "barrier collected"
751 );
752 let (database_id, independent_job_id) = from_partial_graph_id(partial_graph_id);
753 assert_eq!(self.database_id, database_id);
754 if let Some(independent_job_id) = independent_job_id {
755 let job = self
756 .independent_checkpoint_job_controls
757 .get_mut(&independent_job_id)
758 .expect("should exist");
759 let should_force_checkpoint = job.collect(collected_barrier);
760 if should_force_checkpoint {
761 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
762 }
763 }
764 Ok(())
765 }
766}
767
768impl DatabaseCheckpointControl {
769 fn collect_backfill_pinned_upstream_log_epoch(
771 &self,
772 ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
773 self.independent_checkpoint_job_controls
774 .iter()
775 .map(|(job_id, job)| (*job_id, job.pinned_upstream_log_epoch()))
776 .collect()
777 }
778
779 fn collect_no_shuffle_fragment_relations_for_reschedule_check(
780 &self,
781 ) -> Vec<(FragmentId, FragmentId)> {
782 let mut no_shuffle_relations = Vec::new();
783 for fragment in self.database_info.fragment_infos() {
784 let downstream_fragment_id = fragment.fragment_id;
785 visit_stream_node_cont(&fragment.nodes, |node| {
786 if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
787 && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
788 {
789 no_shuffle_relations.push((merge.upstream_fragment_id, downstream_fragment_id));
790 }
791 true
792 });
793 }
794
795 for job in self.independent_checkpoint_job_controls.values() {
796 if let Some(fragment_infos) = job.fragment_infos() {
797 for fragment_info in fragment_infos.values() {
798 let downstream_fragment_id = fragment_info.fragment_id;
799 visit_stream_node_cont(&fragment_info.nodes, |node| {
800 if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
801 && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
802 {
803 no_shuffle_relations
804 .push((merge.upstream_fragment_id, downstream_fragment_id));
805 }
806 true
807 });
808 }
809 }
810 }
811 no_shuffle_relations
812 }
813
814 fn collect_reschedule_blocked_jobs_for_independent_jobs_inflight(
815 &self,
816 ) -> MetaResult<HashSet<JobId>> {
817 let mut initial_blocked_fragment_ids = HashSet::new();
818 for job in self.independent_checkpoint_job_controls.values() {
819 if let Some(fragment_infos) = job.fragment_infos() {
820 for fragment_info in fragment_infos.values() {
821 if fragment_has_online_unreschedulable_scan(fragment_info) {
822 initial_blocked_fragment_ids.insert(fragment_info.fragment_id);
823 collect_fragment_upstream_fragment_ids(
824 fragment_info,
825 &mut initial_blocked_fragment_ids,
826 );
827 }
828 }
829 }
830 }
831
832 let mut blocked_fragment_ids = initial_blocked_fragment_ids.clone();
833 if !initial_blocked_fragment_ids.is_empty() {
834 let no_shuffle_relations =
835 self.collect_no_shuffle_fragment_relations_for_reschedule_check();
836 let (forward_edges, backward_edges) =
837 build_no_shuffle_fragment_graph_edges(no_shuffle_relations);
838 let initial_blocked_fragment_ids: Vec<_> =
839 initial_blocked_fragment_ids.iter().copied().collect();
840 for ensemble in find_no_shuffle_graphs(
841 &initial_blocked_fragment_ids,
842 &forward_edges,
843 &backward_edges,
844 )? {
845 blocked_fragment_ids.extend(ensemble.fragments());
846 }
847 }
848
849 let mut blocked_job_ids = HashSet::new();
850 blocked_job_ids.extend(
851 blocked_fragment_ids
852 .into_iter()
853 .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id)),
854 );
855 Ok(blocked_job_ids)
856 }
857
858 fn collect_reschedule_blocked_job_ids(
859 &self,
860 reschedules: &HashMap<FragmentId, Reschedule>,
861 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
862 blocked_job_ids: &HashSet<JobId>,
863 ) -> HashSet<JobId> {
864 let mut affected_fragment_ids: HashSet<FragmentId> = reschedules.keys().copied().collect();
865 affected_fragment_ids.extend(fragment_actors.keys().copied());
866 for reschedule in reschedules.values() {
867 affected_fragment_ids.extend(reschedule.downstream_fragment_ids.iter().copied());
868 affected_fragment_ids.extend(
869 reschedule
870 .upstream_fragment_dispatcher_ids
871 .iter()
872 .map(|(fragment_id, _)| *fragment_id),
873 );
874 }
875
876 affected_fragment_ids
877 .into_iter()
878 .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id))
879 .filter(|job_id| blocked_job_ids.contains(job_id))
880 .collect()
881 }
882
883 fn next_complete_barrier_task(
884 &mut self,
885 periodic_barriers: &mut PeriodicBarriers,
886 partial_graph_manager: &mut PartialGraphManager,
887 task: &mut Option<CompleteBarrierTask>,
888 hummock_version_stats: &HummockVersionStats,
889 ) {
890 let mut independent_jobs_task = vec![];
892 if let Some(committed_epoch) = self.committed_epoch {
893 let mut finished_jobs = Vec::new();
895 let min_upstream_inflight_barrier = partial_graph_manager
896 .first_inflight_barrier(self.partial_graph_id)
897 .map(|epoch| epoch.prev);
898 for (job_id, job) in &mut self.independent_checkpoint_job_controls {
899 let IndependentCheckpointJobControl::CreatingStreamingJob(job) = job;
900 if let Some((epoch, resps, info, is_finish_epoch)) = job.start_completing(
901 partial_graph_manager,
902 min_upstream_inflight_barrier,
903 committed_epoch,
904 ) {
905 let resps = resps.into_values().collect_vec();
906 if is_finish_epoch {
907 assert!(info.notifiers.is_empty());
908 finished_jobs.push((*job_id, epoch, resps));
909 continue;
910 };
911 independent_jobs_task.push((*job_id, epoch, resps, info));
912 }
913 }
914
915 if !finished_jobs.is_empty() {
916 partial_graph_manager.remove_partial_graphs(
917 finished_jobs
918 .iter()
919 .map(|(job_id, ..)| to_partial_graph_id(self.database_id, Some(*job_id)))
920 .collect(),
921 );
922 }
923 for (job_id, epoch, resps) in finished_jobs {
924 debug!(epoch, %job_id, "finish creating job");
925 let IndependentCheckpointJobControl::CreatingStreamingJob(creating_streaming_job) =
928 self.independent_checkpoint_job_controls
929 .remove(&job_id)
930 .expect("should exist");
931 let tracking_job = creating_streaming_job.into_tracking_job();
932 self.finishing_jobs_collector
933 .collect(epoch, job_id, (resps, tracking_job));
934 }
935 }
936 let mut observed_non_checkpoint = false;
937 self.finishing_jobs_collector.advance_collected();
938 let epoch_end_bound = self
939 .finishing_jobs_collector
940 .first_inflight_epoch()
941 .map_or(Unbounded, |epoch| Excluded(epoch.prev));
942 if let Some((epoch, resps, info)) = partial_graph_manager.start_completing(
943 self.partial_graph_id,
944 epoch_end_bound,
945 |_, resps, post_collect_command| {
946 observed_non_checkpoint = true;
947 self.handle_refresh_table_info(task, &resps);
948 self.database_info.apply_collected_command(
949 &post_collect_command,
950 &resps,
951 hummock_version_stats,
952 );
953 },
954 ) {
955 self.handle_refresh_table_info(task, &resps);
956 self.database_info.apply_collected_command(
957 &info.post_collect_command,
958 &resps,
959 hummock_version_stats,
960 );
961 let mut resps_to_commit = resps.into_values().collect_vec();
962 let mut staging_commit_info = self.database_info.take_staging_commit_info();
963 if let Some((_, finished_jobs, _)) =
964 self.finishing_jobs_collector
965 .take_collected_if(|collected_epoch| {
966 assert!(epoch <= collected_epoch.prev);
967 epoch == collected_epoch.prev
968 })
969 {
970 finished_jobs
971 .into_iter()
972 .for_each(|(_, (resps, tracking_job))| {
973 resps_to_commit.extend(resps);
974 staging_commit_info.finished_jobs.push(tracking_job);
975 });
976 }
977 {
978 let task = task.get_or_insert_default();
979 Command::collect_commit_epoch_info(
980 &self.database_info,
981 &info,
982 &mut task.commit_info,
983 resps_to_commit,
984 self.collect_backfill_pinned_upstream_log_epoch(),
985 );
986 self.completing_barrier = Some(info.barrier_info.prev_epoch());
987 task.finished_jobs.extend(staging_commit_info.finished_jobs);
988 task.finished_cdc_table_backfill
989 .extend(staging_commit_info.finished_cdc_table_backfill);
990 task.epoch_infos
991 .try_insert(self.partial_graph_id, info)
992 .expect("non duplicate");
993 task.commit_info
994 .truncate_tables
995 .extend(staging_commit_info.table_ids_to_truncate);
996 }
997 } else if observed_non_checkpoint
998 && self.database_info.has_pending_finished_jobs()
999 && !partial_graph_manager.has_pending_checkpoint_barrier(self.partial_graph_id)
1000 {
1001 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
1002 }
1003 if !independent_jobs_task.is_empty() {
1004 let task = task.get_or_insert_default();
1005 for (job_id, epoch, resps, info) in independent_jobs_task {
1006 collect_independent_job_commit_epoch_info(
1007 &mut task.commit_info,
1008 epoch,
1009 resps,
1010 &info,
1011 );
1012 task.epoch_infos
1013 .try_insert(to_partial_graph_id(self.database_id, Some(job_id)), info)
1014 .expect("non duplicate");
1015 }
1016 }
1017 }
1018
1019 fn ack_completed(
1020 &mut self,
1021 partial_graph_manager: &mut PartialGraphManager,
1022 command_prev_epoch: Option<u64>,
1023 independent_job_epochs: Vec<(JobId, u64)>,
1024 ) {
1025 {
1026 if let Some(prev_epoch) = self.completing_barrier.take() {
1027 assert_eq!(command_prev_epoch, Some(prev_epoch));
1028 self.committed_epoch = Some(prev_epoch);
1029 partial_graph_manager.ack_completed(self.partial_graph_id, prev_epoch);
1030 } else {
1031 assert_eq!(command_prev_epoch, None);
1032 };
1033 for (job_id, epoch) in independent_job_epochs {
1034 if let Some(job) = self.independent_checkpoint_job_controls.get_mut(&job_id) {
1035 job.ack_completed(partial_graph_manager, epoch);
1036 }
1037 }
1040 }
1041 }
1042
1043 fn handle_refresh_table_info(
1044 &self,
1045 task: &mut Option<CompleteBarrierTask>,
1046 resps: &HashMap<WorkerId, BarrierCompleteResponse>,
1047 ) {
1048 let list_finished_info = resps
1049 .values()
1050 .flat_map(|resp| resp.list_finished_sources.clone())
1051 .collect::<Vec<_>>();
1052 if !list_finished_info.is_empty() {
1053 let task = task.get_or_insert_default();
1054 task.list_finished_source_ids.extend(list_finished_info);
1055 }
1056
1057 let load_finished_info = resps
1058 .values()
1059 .flat_map(|resp| resp.load_finished_sources.clone())
1060 .collect::<Vec<_>>();
1061 if !load_finished_info.is_empty() {
1062 let task = task.get_or_insert_default();
1063 task.load_finished_source_ids.extend(load_finished_info);
1064 }
1065
1066 let refresh_finished_table_ids: Vec<JobId> = resps
1067 .values()
1068 .flat_map(|resp| {
1069 resp.refresh_finished_tables
1070 .iter()
1071 .map(|table_id| table_id.as_job_id())
1072 })
1073 .collect::<Vec<_>>();
1074 if !refresh_finished_table_ids.is_empty() {
1075 let task = task.get_or_insert_default();
1076 task.refresh_finished_table_job_ids
1077 .extend(refresh_finished_table_ids);
1078 }
1079 }
1080}
1081
1082impl DatabaseCheckpointControl {
1083 fn handle_new_barrier(
1085 &mut self,
1086 command: Option<(Command, Vec<Notifier>)>,
1087 checkpoint: bool,
1088 span: tracing::Span,
1089 partial_graph_manager: &mut PartialGraphManager,
1090 hummock_version_stats: &HummockVersionStats,
1091 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
1092 worker_nodes: &HashMap<WorkerId, WorkerNode>,
1093 ) -> MetaResult<()> {
1094 let curr_epoch = self.state.in_flight_prev_epoch().next();
1095
1096 let (command, mut notifiers) = if let Some((command, notifiers)) = command {
1097 (Some(command), notifiers)
1098 } else {
1099 (None, vec![])
1100 };
1101
1102 debug_assert!(
1103 !matches!(
1104 &command,
1105 Some(Command::RescheduleIntent {
1106 reschedule_plan: None,
1107 ..
1108 })
1109 ),
1110 "reschedule intent should be resolved before injection"
1111 );
1112
1113 if let Some(Command::DropStreamingJobs {
1114 streaming_job_ids, ..
1115 }) = &command
1116 {
1117 if streaming_job_ids.len() > 1 {
1118 for job_to_cancel in streaming_job_ids {
1119 if self
1120 .independent_checkpoint_job_controls
1121 .contains_key(job_to_cancel)
1122 {
1123 warn!(
1124 job_id = %job_to_cancel,
1125 "ignore multi-job cancel command on creating snapshot backfill streaming job"
1126 );
1127 for notifier in notifiers {
1128 notifier
1129 .notify_start_failed(anyhow!("cannot cancel creating snapshot backfill streaming job with other jobs, \
1130 the job will continue creating until created or recovery. Please cancel the snapshot backfilling job in a single DDL ").into());
1131 }
1132 return Ok(());
1133 }
1134 }
1135 } else if let Some(job_to_drop) = streaming_job_ids.iter().next()
1136 && let Some(job) = self
1137 .independent_checkpoint_job_controls
1138 .get_mut(job_to_drop)
1139 {
1140 let dropped = job.drop(&mut notifiers, partial_graph_manager);
1141 if dropped {
1142 return Ok(());
1143 }
1144 }
1145 }
1146
1147 if let Some(Command::Throttle { jobs, .. }) = &command
1148 && jobs.len() > 1
1149 && let Some(independent_job_id) = jobs
1150 .iter()
1151 .find(|job| self.independent_checkpoint_job_controls.contains_key(*job))
1152 {
1153 warn!(
1154 job_id = %independent_job_id,
1155 "ignore multi-job throttle command on independent checkpoint job"
1156 );
1157 for notifier in notifiers {
1158 notifier.notify_start_failed(
1159 anyhow!(
1160 "cannot alter rate limit for independent checkpoint job with other jobs, \
1161 the original rate limit will be kept during recovery."
1162 )
1163 .into(),
1164 );
1165 }
1166 return Ok(());
1167 };
1168
1169 if let Some(Command::RescheduleIntent {
1170 reschedule_plan: Some(reschedule_plan),
1171 ..
1172 }) = &command
1173 && !self.independent_checkpoint_job_controls.is_empty()
1174 {
1175 let blocked_job_ids =
1176 self.collect_reschedule_blocked_jobs_for_independent_jobs_inflight()?;
1177 let blocked_reschedule_job_ids = self.collect_reschedule_blocked_job_ids(
1178 &reschedule_plan.reschedules,
1179 &reschedule_plan.fragment_actors,
1180 &blocked_job_ids,
1181 );
1182 if !blocked_reschedule_job_ids.is_empty() {
1183 warn!(
1184 blocked_reschedule_job_ids = ?blocked_reschedule_job_ids,
1185 "reject reschedule fragments related to creating unreschedulable backfill jobs"
1186 );
1187 for notifier in notifiers {
1188 notifier.notify_start_failed(
1189 anyhow!(
1190 "cannot reschedule jobs {:?} when creating jobs with unreschedulable backfill fragments",
1191 blocked_reschedule_job_ids
1192 )
1193 .into(),
1194 );
1195 }
1196 return Ok(());
1197 }
1198 }
1199
1200 if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1201 && self.database_info.is_empty()
1202 {
1203 assert!(
1204 self.independent_checkpoint_job_controls.is_empty(),
1205 "should not have snapshot backfill job when there is no normal job in database"
1206 );
1207 for mut notifier in notifiers {
1209 notifier.notify_started();
1210 notifier.notify_collected();
1211 }
1212 return Ok(());
1213 };
1214
1215 if let Some(Command::CreateStreamingJob {
1216 job_type: CreateStreamingJobType::SnapshotBackfill(_),
1217 ..
1218 }) = &command
1219 && self.state.is_paused()
1220 {
1221 warn!("cannot create streaming job with snapshot backfill when paused");
1222 for notifier in notifiers {
1223 notifier.notify_start_failed(
1224 anyhow!("cannot create streaming job with snapshot backfill when paused",)
1225 .into(),
1226 );
1227 }
1228 return Ok(());
1229 }
1230
1231 let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1232 barrier_info.prev_epoch.span().in_scope(|| {
1234 tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch(), "new barrier enqueued");
1235 });
1236 span.record("epoch", barrier_info.curr_epoch());
1237
1238 let epoch = barrier_info.epoch();
1239 let ApplyCommandInfo { jobs_to_wait } = match self.apply_command(
1240 command,
1241 &mut notifiers,
1242 barrier_info,
1243 partial_graph_manager,
1244 hummock_version_stats,
1245 adaptive_parallelism_strategy,
1246 worker_nodes,
1247 ) {
1248 Ok(info) => {
1249 assert!(notifiers.is_empty());
1250 info
1251 }
1252 Err(err) => {
1253 for notifier in notifiers {
1254 notifier.notify_start_failed(err.clone());
1255 }
1256 fail_point!("inject_barrier_err_success");
1257 return Err(err);
1258 }
1259 };
1260
1261 self.enqueue_command(epoch, jobs_to_wait);
1263
1264 Ok(())
1265 }
1266}