1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::ops::Bound::{Excluded, Unbounded};
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use itertools::Itertools;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_common::util::epoch::EpochPair;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
29use risingwave_meta_model::WorkerId;
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::id::{FragmentId, PartialGraphId};
33use risingwave_pb::stream_plan::DispatcherType as PbDispatcherType;
34use risingwave_pb::stream_plan::stream_node::NodeBody;
35use risingwave_pb::stream_service::BarrierCompleteResponse;
36use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
37use tracing::{debug, warn};
38
39use crate::barrier::cdc_progress::CdcProgress;
40use crate::barrier::checkpoint::independent_job::IndependentCheckpointJobControl;
41use crate::barrier::checkpoint::recovery::{
42 DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
43 RecoveringStateAction,
44};
45use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
46use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
47use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
48use crate::barrier::notifier::Notifier;
49use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager, PartialGraphStat};
50use crate::barrier::progress::TrackingJob;
51use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
52use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
53use crate::barrier::utils::{BarrierItemCollector, collect_independent_job_commit_epoch_info};
54use crate::barrier::{
55 BackfillProgress, Command, CreateStreamingJobType, FragmentBackfillProgress, Reschedule,
56};
57use crate::controller::fragment::InflightFragmentInfo;
58use crate::controller::scale::{build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs};
59use crate::manager::MetaSrvEnv;
60
61fn fragment_has_online_unreschedulable_scan(fragment: &InflightFragmentInfo) -> bool {
62 let mut has_unreschedulable_scan = false;
63 visit_stream_node_cont(&fragment.nodes, |node| {
64 if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
65 let scan_type = stream_scan.stream_scan_type();
66 if !scan_type.is_reschedulable(true) {
67 has_unreschedulable_scan = true;
68 return false;
69 }
70 }
71 true
72 });
73 has_unreschedulable_scan
74}
75
76fn collect_fragment_upstream_fragment_ids(
77 fragment: &InflightFragmentInfo,
78 upstream_fragment_ids: &mut HashSet<FragmentId>,
79) {
80 visit_stream_node_cont(&fragment.nodes, |node| {
81 if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref() {
82 upstream_fragment_ids.insert(merge.upstream_fragment_id);
83 }
84 true
85 });
86}
87
88use crate::model::ActorId;
89use crate::rpc::metrics::GLOBAL_META_METRICS;
90use crate::{MetaError, MetaResult};
91
92pub(crate) struct CheckpointControl {
93 pub(crate) env: MetaSrvEnv,
94 pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
95 pub(super) hummock_version_stats: HummockVersionStats,
96 pub(crate) in_flight_barrier_nums: usize,
98}
99
100impl CheckpointControl {
101 pub fn new(env: MetaSrvEnv) -> Self {
102 Self {
103 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
104 env,
105 databases: Default::default(),
106 hummock_version_stats: Default::default(),
107 }
108 }
109
110 pub(crate) fn recover(
111 databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
112 failed_databases: HashMap<DatabaseId, HashSet<PartialGraphId>>, hummock_version_stats: HummockVersionStats,
114 env: MetaSrvEnv,
115 ) -> Self {
116 env.shared_actor_infos()
117 .retain_databases(databases.keys().chain(failed_databases.keys()).cloned());
118 Self {
119 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
120 env,
121 databases: databases
122 .into_iter()
123 .map(|(database_id, control)| {
124 (
125 database_id,
126 DatabaseCheckpointControlStatus::Running(control),
127 )
128 })
129 .chain(failed_databases.into_iter().map(
130 |(database_id, resetting_partial_graphs)| {
131 (
132 database_id,
133 DatabaseCheckpointControlStatus::Recovering(
134 DatabaseRecoveringState::new_resetting(
135 database_id,
136 resetting_partial_graphs,
137 ),
138 ),
139 )
140 },
141 ))
142 .collect(),
143 hummock_version_stats,
144 }
145 }
146
147 pub(crate) fn ack_completed(
148 &mut self,
149 partial_graph_manager: &mut PartialGraphManager,
150 output: BarrierCompleteOutput,
151 ) {
152 self.hummock_version_stats = output.hummock_version_stats;
153 for (database_id, (command_prev_epoch, independent_job_epochs)) in output.epochs_to_ack {
154 self.databases
155 .get_mut(&database_id)
156 .expect("should exist")
157 .expect_running("should have wait for completing command before enter recovery")
158 .ack_completed(
159 partial_graph_manager,
160 command_prev_epoch,
161 independent_job_epochs,
162 );
163 }
164 }
165
166 pub(crate) fn next_complete_barrier_task(
167 &mut self,
168 periodic_barriers: &mut PeriodicBarriers,
169 partial_graph_manager: &mut PartialGraphManager,
170 ) -> Option<CompleteBarrierTask> {
171 let mut task = None;
172 for database in self.databases.values_mut() {
173 let Some(database) = database.running_state_mut() else {
174 continue;
175 };
176 database.next_complete_barrier_task(
177 periodic_barriers,
178 partial_graph_manager,
179 &mut task,
180 &self.hummock_version_stats,
181 );
182 }
183 task
184 }
185
186 pub(crate) fn barrier_collected(
187 &mut self,
188 partial_graph_id: PartialGraphId,
189 collected_barrier: CollectedBarrier<'_>,
190 periodic_barriers: &mut PeriodicBarriers,
191 ) -> MetaResult<()> {
192 let (database_id, _) = from_partial_graph_id(partial_graph_id);
193 let database_status = self.databases.get_mut(&database_id).expect("should exist");
194 match database_status {
195 DatabaseCheckpointControlStatus::Running(database) => {
196 database.barrier_collected(partial_graph_id, collected_barrier, periodic_barriers)
197 }
198 DatabaseCheckpointControlStatus::Recovering(_) => {
199 if cfg!(debug_assertions) {
200 panic!(
201 "receive collected barrier {:?} on recovering database {} from partial graph {}",
202 collected_barrier, database_id, partial_graph_id
203 );
204 } else {
205 warn!(?collected_barrier, %partial_graph_id, "ignore collected barrier on recovering database");
206 }
207 Ok(())
208 }
209 }
210 }
211
212 pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
213 self.databases.iter().filter_map(|(database_id, database)| {
214 database.running_state().is_none().then_some(*database_id)
215 })
216 }
217
218 pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
219 self.databases.iter().filter_map(|(database_id, database)| {
220 database.running_state().is_some().then_some(*database_id)
221 })
222 }
223
224 pub(crate) fn database_info(&self, database_id: DatabaseId) -> Option<&InflightDatabaseInfo> {
225 self.databases
226 .get(&database_id)
227 .and_then(|database| database.running_state())
228 .map(|database| &database.database_info)
229 }
230
231 pub(crate) fn may_have_snapshot_backfilling_jobs(&self) -> bool {
232 self.databases
233 .values()
234 .any(|database| database.may_have_snapshot_backfilling_jobs())
235 }
236
237 pub(crate) fn handle_new_barrier(
239 &mut self,
240 new_barrier: NewBarrier,
241 partial_graph_manager: &mut PartialGraphManager,
242 worker_nodes: &HashMap<WorkerId, WorkerNode>,
243 ) -> MetaResult<()> {
244 let NewBarrier {
245 database_id,
246 command,
247 span,
248 checkpoint,
249 } = new_barrier;
250
251 if let Some((mut command, notifiers)) = command {
252 if let &mut Command::CreateStreamingJob {
253 ref mut cross_db_snapshot_backfill_info,
254 ref info,
255 ..
256 } = &mut command
257 {
258 for (table_id, snapshot_epoch) in
259 &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
260 {
261 for database in self.databases.values() {
262 if let Some(database) = database.running_state()
263 && database.database_info.contains_job(table_id.as_job_id())
264 {
265 if let Some(committed_epoch) = database.committed_epoch {
266 *snapshot_epoch = Some(committed_epoch);
267 }
268 break;
269 }
270 }
271 if snapshot_epoch.is_none() {
272 let table_id = *table_id;
273 warn!(
274 ?cross_db_snapshot_backfill_info,
275 ?table_id,
276 ?info,
277 "database of cross db upstream table not found"
278 );
279 let err: MetaError =
280 anyhow!("database of cross db upstream table {} not found", table_id)
281 .into();
282 for notifier in notifiers {
283 notifier.notify_start_failed(err.clone());
284 }
285
286 return Ok(());
287 }
288 }
289 }
290
291 let database = match self.databases.entry(database_id) {
292 Entry::Occupied(entry) => entry
293 .into_mut()
294 .expect_running("should not have command when not running"),
295 Entry::Vacant(entry) => match &command {
296 Command::CreateStreamingJob { info, job_type, .. } => {
297 let CreateStreamingJobType::Normal = job_type else {
298 if cfg!(debug_assertions) {
299 panic!(
300 "unexpected first job of type {job_type:?} with info {info:?}"
301 );
302 } else {
303 for notifier in notifiers {
304 notifier.notify_start_failed(anyhow!("unexpected job_type {job_type:?} for first job {} in database {database_id}", info.streaming_job.id()).into());
305 }
306 return Ok(());
307 }
308 };
309 let new_database = DatabaseCheckpointControl::new(
310 database_id,
311 self.env.shared_actor_infos().clone(),
312 );
313 let adder = partial_graph_manager.add_partial_graph(
314 to_partial_graph_id(database_id, None),
315 DatabaseCheckpointControlMetrics::new(database_id),
316 );
317 adder.added();
318 entry
319 .insert(DatabaseCheckpointControlStatus::Running(new_database))
320 .expect_running("just initialized as running")
321 }
322 Command::Flush
323 | Command::Pause
324 | Command::Resume
325 | Command::DropStreamingJobs { .. }
326 | Command::DropSubscription { .. } => {
327 for mut notifier in notifiers {
328 notifier.notify_started();
329 notifier.notify_collected();
330 }
331 warn!(?command, "skip command for empty database");
332 return Ok(());
333 }
334 Command::RescheduleIntent { .. }
335 | Command::ReplaceStreamJob(_)
336 | Command::SourceChangeSplit(_)
337 | Command::Throttle { .. }
338 | Command::CreateSubscription { .. }
339 | Command::AlterSubscriptionRetention { .. }
340 | Command::ConnectorPropsChange(_)
341 | Command::Refresh { .. }
342 | Command::ListFinish { .. }
343 | Command::LoadFinish { .. }
344 | Command::ResetSource { .. }
345 | Command::ResumeBackfill { .. }
346 | Command::InjectSourceOffsets { .. } => {
347 if cfg!(debug_assertions) {
348 panic!(
349 "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
350 database_id, command
351 )
352 } else {
353 warn!(%database_id, ?command, "database not exist when handling command");
354 for notifier in notifiers {
355 notifier.notify_start_failed(anyhow!("database {database_id} not exist when handling command {command:?}").into());
356 }
357 return Ok(());
358 }
359 }
360 },
361 };
362
363 database.handle_new_barrier(
364 Some((command, notifiers)),
365 checkpoint,
366 span,
367 partial_graph_manager,
368 &self.hummock_version_stats,
369 worker_nodes,
370 )
371 } else {
372 let database = match self.databases.entry(database_id) {
373 Entry::Occupied(entry) => entry.into_mut(),
374 Entry::Vacant(_) => {
375 return Ok(());
378 }
379 };
380 let Some(database) = database.running_state_mut() else {
381 return Ok(());
383 };
384 if partial_graph_manager.inflight_barrier_num(database.partial_graph_id)
385 >= self.in_flight_barrier_nums
386 {
387 return Ok(());
389 }
390 database.handle_new_barrier(
391 None,
392 checkpoint,
393 span,
394 partial_graph_manager,
395 &self.hummock_version_stats,
396 worker_nodes,
397 )
398 }
399 }
400
401 pub(crate) fn gen_backfill_progress(&self) -> HashMap<JobId, BackfillProgress> {
402 let mut progress = HashMap::new();
403 for status in self.databases.values() {
404 let Some(database_checkpoint_control) = status.running_state() else {
405 continue;
406 };
407 progress.extend(
409 database_checkpoint_control
410 .database_info
411 .gen_backfill_progress(),
412 );
413 for (job_id, job) in &database_checkpoint_control.independent_checkpoint_job_controls {
415 if let Some(p) = job.gen_backfill_progress() {
416 progress.insert(*job_id, p);
417 }
418 }
419 }
420 progress
421 }
422
423 pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
424 let mut progress = Vec::new();
425 for status in self.databases.values() {
426 let Some(database_checkpoint_control) = status.running_state() else {
427 continue;
428 };
429 progress.extend(
430 database_checkpoint_control
431 .database_info
432 .gen_fragment_backfill_progress(),
433 );
434 for job in database_checkpoint_control
435 .independent_checkpoint_job_controls
436 .values()
437 {
438 progress.extend(job.gen_fragment_backfill_progress());
439 }
440 }
441 progress
442 }
443
444 pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
445 let mut progress = HashMap::new();
446 for status in self.databases.values() {
447 let Some(database_checkpoint_control) = status.running_state() else {
448 continue;
449 };
450 progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
452 }
453 progress
454 }
455
456 pub(crate) fn databases_failed_at_worker_err(
457 &mut self,
458 worker_id: WorkerId,
459 ) -> impl Iterator<Item = DatabaseId> + '_ {
460 self.databases
461 .iter_mut()
462 .filter_map(
463 move |(database_id, database_status)| match database_status {
464 DatabaseCheckpointControlStatus::Running(control) => {
465 if !control.is_valid_after_worker_err(worker_id) {
466 Some(*database_id)
467 } else {
468 None
469 }
470 }
471 DatabaseCheckpointControlStatus::Recovering(state) => {
472 if !state.is_valid_after_worker_err(worker_id) {
473 Some(*database_id)
474 } else {
475 None
476 }
477 }
478 },
479 )
480 }
481}
482
483pub(crate) enum CheckpointControlEvent<'a> {
484 EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
485 EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
486}
487
488impl CheckpointControl {
489 pub(crate) fn on_partial_graph_reset(
490 &mut self,
491 partial_graph_id: PartialGraphId,
492 reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
493 ) {
494 let (database_id, independent_job_id) = from_partial_graph_id(partial_graph_id);
495 match self.databases.get_mut(&database_id).expect("should exist") {
496 DatabaseCheckpointControlStatus::Running(database) => {
497 if let Some(independent_job_id) = independent_job_id {
498 match database
499 .independent_checkpoint_job_controls
500 .remove(&independent_job_id)
501 {
502 Some(independent_job) => {
503 independent_job.on_partial_graph_reset();
504 }
505 None => {
506 if cfg!(debug_assertions) {
507 panic!(
508 "receive reset partial graph resp on non-existing independent job {independent_job_id} in database {database_id}"
509 )
510 }
511 warn!(
512 %database_id,
513 %independent_job_id,
514 "ignore reset partial graph resp on non-existing independent job on running database"
515 );
516 }
517 }
518 } else {
519 unreachable!("should not receive reset database resp when database running")
520 }
521 }
522 DatabaseCheckpointControlStatus::Recovering(state) => {
523 state.on_partial_graph_reset(partial_graph_id, reset_resps);
524 }
525 }
526 }
527
528 pub(crate) fn on_partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
529 let (database_id, _) = from_partial_graph_id(partial_graph_id);
530 match self.databases.get_mut(&database_id).expect("should exist") {
531 DatabaseCheckpointControlStatus::Running(_) => {
532 unreachable!("should not have partial graph initialized when running")
533 }
534 DatabaseCheckpointControlStatus::Recovering(state) => {
535 state.partial_graph_initialized(partial_graph_id);
536 }
537 }
538 }
539
540 pub(crate) fn next_event(
541 &mut self,
542 ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
543 let mut this = Some(self);
544 poll_fn(move |cx| {
545 let Some(this_mut) = this.as_mut() else {
546 unreachable!("should not be polled after poll ready")
547 };
548 for (&database_id, database_status) in &mut this_mut.databases {
549 match database_status {
550 DatabaseCheckpointControlStatus::Running(_) => {}
551 DatabaseCheckpointControlStatus::Recovering(state) => {
552 let poll_result = state.poll_next_event(cx);
553 if let Poll::Ready(action) = poll_result {
554 let this = this.take().expect("checked Some");
555 return Poll::Ready(match action {
556 RecoveringStateAction::EnterInitializing(reset_workers) => {
557 CheckpointControlEvent::EnteringInitializing(
558 this.new_database_status_action(
559 database_id,
560 EnterInitializing(reset_workers),
561 ),
562 )
563 }
564 RecoveringStateAction::EnterRunning => {
565 CheckpointControlEvent::EnteringRunning(
566 this.new_database_status_action(database_id, EnterRunning),
567 )
568 }
569 });
570 }
571 }
572 }
573 }
574 Poll::Pending
575 })
576 }
577}
578
579pub(crate) enum DatabaseCheckpointControlStatus {
580 Running(DatabaseCheckpointControl),
581 Recovering(DatabaseRecoveringState),
582}
583
584impl DatabaseCheckpointControlStatus {
585 fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
586 match self {
587 DatabaseCheckpointControlStatus::Running(state) => Some(state),
588 DatabaseCheckpointControlStatus::Recovering(_) => None,
589 }
590 }
591
592 fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
593 match self {
594 DatabaseCheckpointControlStatus::Running(state) => Some(state),
595 DatabaseCheckpointControlStatus::Recovering(_) => None,
596 }
597 }
598
599 fn may_have_snapshot_backfilling_jobs(&self) -> bool {
600 self.running_state()
601 .map(|database| {
602 database
603 .independent_checkpoint_job_controls
604 .values()
605 .any(|job| job.is_snapshot_backfilling())
606 })
607 .unwrap_or(true) }
609
610 fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
611 match self {
612 DatabaseCheckpointControlStatus::Running(state) => state,
613 DatabaseCheckpointControlStatus::Recovering(_) => {
614 panic!("should be at running: {}", reason)
615 }
616 }
617 }
618}
619
620pub(in crate::barrier) struct DatabaseCheckpointControlMetrics {
621 barrier_latency: LabelGuardedHistogram,
622 in_flight_barrier_nums: LabelGuardedIntGauge,
623 all_barrier_nums: LabelGuardedIntGauge,
624}
625
626impl DatabaseCheckpointControlMetrics {
627 pub(in crate::barrier) fn new(database_id: DatabaseId) -> Self {
628 let database_id_str = database_id.to_string();
629 let barrier_latency = GLOBAL_META_METRICS
630 .barrier_latency
631 .with_guarded_label_values(&[&database_id_str]);
632 let in_flight_barrier_nums = GLOBAL_META_METRICS
633 .in_flight_barrier_nums
634 .with_guarded_label_values(&[&database_id_str]);
635 let all_barrier_nums = GLOBAL_META_METRICS
636 .all_barrier_nums
637 .with_guarded_label_values(&[&database_id_str]);
638 Self {
639 barrier_latency,
640 in_flight_barrier_nums,
641 all_barrier_nums,
642 }
643 }
644}
645
646impl PartialGraphStat for DatabaseCheckpointControlMetrics {
647 fn observe_barrier_latency(&self, _epoch: EpochPair, barrier_latency_secs: f64) {
648 self.barrier_latency.observe(barrier_latency_secs);
649 }
650
651 fn observe_barrier_num(&self, inflight_barrier_num: usize, collected_barrier_num: usize) {
652 self.in_flight_barrier_nums.set(inflight_barrier_num as _);
653 self.all_barrier_nums
654 .set((inflight_barrier_num + collected_barrier_num) as _);
655 }
656}
657
658pub(in crate::barrier) struct DatabaseCheckpointControl {
660 pub(super) database_id: DatabaseId,
661 partial_graph_id: PartialGraphId,
662 pub(super) state: BarrierWorkerState,
663
664 finishing_jobs_collector:
665 BarrierItemCollector<JobId, (Vec<BarrierCompleteResponse>, TrackingJob), ()>,
666 completing_barrier: Option<u64>,
669
670 committed_epoch: Option<u64>,
671
672 pub(super) database_info: InflightDatabaseInfo,
673 pub independent_checkpoint_job_controls: HashMap<JobId, IndependentCheckpointJobControl>,
674}
675
676impl DatabaseCheckpointControl {
677 fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
678 Self {
679 database_id,
680 partial_graph_id: to_partial_graph_id(database_id, None),
681 state: BarrierWorkerState::new(),
682 finishing_jobs_collector: BarrierItemCollector::new(),
683 completing_barrier: None,
684 committed_epoch: None,
685 database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
686 independent_checkpoint_job_controls: Default::default(),
687 }
688 }
689
690 pub(crate) fn recovery(
691 database_id: DatabaseId,
692 state: BarrierWorkerState,
693 committed_epoch: u64,
694 database_info: InflightDatabaseInfo,
695 independent_checkpoint_job_controls: HashMap<JobId, IndependentCheckpointJobControl>,
696 ) -> Self {
697 Self {
698 database_id,
699 partial_graph_id: to_partial_graph_id(database_id, None),
700 state,
701 finishing_jobs_collector: BarrierItemCollector::new(),
702 completing_barrier: None,
703 committed_epoch: Some(committed_epoch),
704 database_info,
705 independent_checkpoint_job_controls,
706 }
707 }
708
709 pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
710 !self.database_info.contains_worker(worker_id as _)
711 && self
712 .independent_checkpoint_job_controls
713 .values()
714 .all(|job| {
715 job.fragment_infos()
716 .map(|fragment_infos| {
717 !InflightFragmentInfo::contains_worker(
718 fragment_infos.values(),
719 worker_id,
720 )
721 })
722 .unwrap_or(true)
723 })
724 }
725
726 fn enqueue_command(&mut self, epoch: EpochPair, independent_jobs_to_wait: HashSet<JobId>) {
728 let prev_epoch = epoch.prev;
729 tracing::trace!(prev_epoch, ?independent_jobs_to_wait, "enqueue command");
730 if !independent_jobs_to_wait.is_empty() {
731 self.finishing_jobs_collector
732 .enqueue(epoch, independent_jobs_to_wait, ());
733 }
734 }
735
736 fn barrier_collected(
739 &mut self,
740 partial_graph_id: PartialGraphId,
741 collected_barrier: CollectedBarrier<'_>,
742 periodic_barriers: &mut PeriodicBarriers,
743 ) -> MetaResult<()> {
744 let prev_epoch = collected_barrier.epoch.prev;
745 tracing::trace!(
746 prev_epoch,
747 partial_graph_id = %partial_graph_id,
748 "barrier collected"
749 );
750 let (database_id, independent_job_id) = from_partial_graph_id(partial_graph_id);
751 assert_eq!(self.database_id, database_id);
752 if let Some(independent_job_id) = independent_job_id {
753 let job = self
754 .independent_checkpoint_job_controls
755 .get_mut(&independent_job_id)
756 .expect("should exist");
757 let should_force_checkpoint = job.collect(collected_barrier);
758 if should_force_checkpoint {
759 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
760 }
761 }
762 Ok(())
763 }
764}
765
766impl DatabaseCheckpointControl {
767 fn collect_backfill_pinned_upstream_log_epoch(
769 &self,
770 ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
771 self.independent_checkpoint_job_controls
772 .iter()
773 .map(|(job_id, job)| (*job_id, job.pinned_upstream_log_epoch()))
774 .collect()
775 }
776
777 fn collect_no_shuffle_fragment_relations_for_reschedule_check(
778 &self,
779 ) -> Vec<(FragmentId, FragmentId)> {
780 let mut no_shuffle_relations = Vec::new();
781 for fragment in self.database_info.fragment_infos() {
782 let downstream_fragment_id = fragment.fragment_id;
783 visit_stream_node_cont(&fragment.nodes, |node| {
784 if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
785 && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
786 {
787 no_shuffle_relations.push((merge.upstream_fragment_id, downstream_fragment_id));
788 }
789 true
790 });
791 }
792
793 for job in self.independent_checkpoint_job_controls.values() {
794 if let Some(fragment_infos) = job.fragment_infos() {
795 for fragment_info in fragment_infos.values() {
796 let downstream_fragment_id = fragment_info.fragment_id;
797 visit_stream_node_cont(&fragment_info.nodes, |node| {
798 if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
799 && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
800 {
801 no_shuffle_relations
802 .push((merge.upstream_fragment_id, downstream_fragment_id));
803 }
804 true
805 });
806 }
807 }
808 }
809 no_shuffle_relations
810 }
811
812 fn collect_reschedule_blocked_jobs_for_independent_jobs_inflight(
813 &self,
814 ) -> MetaResult<HashSet<JobId>> {
815 let mut initial_blocked_fragment_ids = HashSet::new();
816 for job in self.independent_checkpoint_job_controls.values() {
817 if let Some(fragment_infos) = job.fragment_infos() {
818 for fragment_info in fragment_infos.values() {
819 if fragment_has_online_unreschedulable_scan(fragment_info) {
820 initial_blocked_fragment_ids.insert(fragment_info.fragment_id);
821 collect_fragment_upstream_fragment_ids(
822 fragment_info,
823 &mut initial_blocked_fragment_ids,
824 );
825 }
826 }
827 }
828 }
829
830 let mut blocked_fragment_ids = initial_blocked_fragment_ids.clone();
831 if !initial_blocked_fragment_ids.is_empty() {
832 let no_shuffle_relations =
833 self.collect_no_shuffle_fragment_relations_for_reschedule_check();
834 let (forward_edges, backward_edges) =
835 build_no_shuffle_fragment_graph_edges(no_shuffle_relations);
836 let initial_blocked_fragment_ids: Vec<_> =
837 initial_blocked_fragment_ids.iter().copied().collect();
838 for ensemble in find_no_shuffle_graphs(
839 &initial_blocked_fragment_ids,
840 &forward_edges,
841 &backward_edges,
842 )? {
843 blocked_fragment_ids.extend(ensemble.fragments());
844 }
845 }
846
847 let mut blocked_job_ids = HashSet::new();
848 blocked_job_ids.extend(
849 blocked_fragment_ids
850 .into_iter()
851 .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id)),
852 );
853 Ok(blocked_job_ids)
854 }
855
856 fn collect_reschedule_blocked_job_ids(
857 &self,
858 reschedules: &HashMap<FragmentId, Reschedule>,
859 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
860 blocked_job_ids: &HashSet<JobId>,
861 ) -> HashSet<JobId> {
862 let mut affected_fragment_ids: HashSet<FragmentId> = reschedules.keys().copied().collect();
863 affected_fragment_ids.extend(fragment_actors.keys().copied());
864 for reschedule in reschedules.values() {
865 affected_fragment_ids.extend(reschedule.downstream_fragment_ids.iter().copied());
866 affected_fragment_ids.extend(
867 reschedule
868 .upstream_fragment_dispatcher_ids
869 .iter()
870 .map(|(fragment_id, _)| *fragment_id),
871 );
872 }
873
874 affected_fragment_ids
875 .into_iter()
876 .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id))
877 .filter(|job_id| blocked_job_ids.contains(job_id))
878 .collect()
879 }
880
881 fn next_complete_barrier_task(
882 &mut self,
883 periodic_barriers: &mut PeriodicBarriers,
884 partial_graph_manager: &mut PartialGraphManager,
885 task: &mut Option<CompleteBarrierTask>,
886 hummock_version_stats: &HummockVersionStats,
887 ) {
888 let mut independent_jobs_task = vec![];
890 if let Some(committed_epoch) = self.committed_epoch {
891 let mut finished_jobs = Vec::new();
893 let min_upstream_inflight_barrier = partial_graph_manager
894 .first_inflight_barrier(self.partial_graph_id)
895 .map(|epoch| epoch.prev);
896 for (job_id, job) in &mut self.independent_checkpoint_job_controls {
897 match job {
898 IndependentCheckpointJobControl::CreatingStreamingJob(creating_job) => {
899 if let Some((epoch, resps, info, is_finish_epoch)) = creating_job
900 .start_completing(
901 partial_graph_manager,
902 min_upstream_inflight_barrier,
903 committed_epoch,
904 )
905 {
906 let resps = resps.into_values().collect_vec();
907 if is_finish_epoch {
908 assert!(info.notifiers.is_empty());
909 finished_jobs.push((*job_id, epoch, resps));
910 continue;
911 };
912 independent_jobs_task.push((*job_id, epoch, resps, info));
913 }
914 }
915 IndependentCheckpointJobControl::BatchRefresh(batch_refresh_job) => {
916 if let Some((epoch, resps, info, tracking_job)) =
917 batch_refresh_job.start_completing(partial_graph_manager)
918 {
919 let resps = resps.into_values().collect_vec();
920 if let Some(tracking_job) = tracking_job {
921 let task = task.get_or_insert_default();
922 task.finished_jobs.push(tracking_job);
923 }
924 independent_jobs_task.push((*job_id, epoch, resps, info));
925 }
926 }
927 }
928 }
929 if !finished_jobs.is_empty() {
930 partial_graph_manager.remove_partial_graphs(
931 finished_jobs
932 .iter()
933 .map(|(job_id, ..)| to_partial_graph_id(self.database_id, Some(*job_id)))
934 .collect(),
935 );
936 }
937 for (job_id, epoch, resps) in finished_jobs {
938 debug!(epoch, %job_id, "finish creating job");
939 let Some(IndependentCheckpointJobControl::CreatingStreamingJob(
942 creating_streaming_job,
943 )) = self.independent_checkpoint_job_controls.remove(&job_id)
944 else {
945 panic!("finished job {job_id} should be a creating streaming job");
946 };
947 let tracking_job = creating_streaming_job.into_tracking_job();
948 self.finishing_jobs_collector
949 .collect(epoch, job_id, (resps, tracking_job));
950 }
951 }
952 let mut observed_non_checkpoint = false;
953 self.finishing_jobs_collector.advance_collected();
954 let epoch_end_bound = self
955 .finishing_jobs_collector
956 .first_inflight_epoch()
957 .map_or(Unbounded, |epoch| Excluded(epoch.prev));
958 if let Some((epoch, resps, info)) = partial_graph_manager.start_completing(
959 self.partial_graph_id,
960 epoch_end_bound,
961 |_, resps, post_collect_command| {
962 observed_non_checkpoint = true;
963 self.handle_refresh_table_info(task, &resps);
964 self.database_info.apply_collected_command(
965 &post_collect_command,
966 &resps,
967 hummock_version_stats,
968 );
969 },
970 ) {
971 self.handle_refresh_table_info(task, &resps);
972 self.database_info.apply_collected_command(
973 &info.post_collect_command,
974 &resps,
975 hummock_version_stats,
976 );
977 let mut resps_to_commit = resps.into_values().collect_vec();
978 let mut staging_commit_info = self.database_info.take_staging_commit_info();
979 if let Some((_, finished_jobs, _)) =
980 self.finishing_jobs_collector
981 .take_collected_if(|collected_epoch| {
982 assert!(epoch <= collected_epoch.prev);
983 epoch == collected_epoch.prev
984 })
985 {
986 finished_jobs
987 .into_iter()
988 .for_each(|(_, (resps, tracking_job))| {
989 resps_to_commit.extend(resps);
990 staging_commit_info.finished_jobs.push(tracking_job);
991 });
992 }
993 {
994 let task = task.get_or_insert_default();
995 Command::collect_commit_epoch_info(
996 &self.database_info,
997 &info,
998 &mut task.commit_info,
999 resps_to_commit,
1000 self.collect_backfill_pinned_upstream_log_epoch(),
1001 );
1002 self.completing_barrier = Some(info.barrier_info.prev_epoch());
1003 task.finished_jobs.extend(staging_commit_info.finished_jobs);
1004 task.finished_cdc_table_backfill
1005 .extend(staging_commit_info.finished_cdc_table_backfill);
1006 task.epoch_infos
1007 .try_insert(self.partial_graph_id, info)
1008 .expect("non duplicate");
1009 task.commit_info
1010 .truncate_tables
1011 .extend(staging_commit_info.table_ids_to_truncate);
1012 }
1013 } else if observed_non_checkpoint
1014 && self.database_info.has_pending_finished_jobs()
1015 && !partial_graph_manager.has_pending_checkpoint_barrier(self.partial_graph_id)
1016 {
1017 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
1018 }
1019 if !independent_jobs_task.is_empty() {
1020 let task = task.get_or_insert_default();
1021 for (job_id, epoch, resps, info) in independent_jobs_task {
1022 collect_independent_job_commit_epoch_info(
1023 &mut task.commit_info,
1024 epoch,
1025 resps,
1026 &info,
1027 );
1028 task.epoch_infos
1029 .try_insert(to_partial_graph_id(self.database_id, Some(job_id)), info)
1030 .expect("non duplicate");
1031 }
1032 }
1033 }
1034
1035 fn ack_completed(
1036 &mut self,
1037 partial_graph_manager: &mut PartialGraphManager,
1038 command_prev_epoch: Option<u64>,
1039 independent_job_epochs: Vec<(JobId, u64)>,
1040 ) {
1041 {
1042 if let Some(prev_epoch) = self.completing_barrier.take() {
1043 assert_eq!(command_prev_epoch, Some(prev_epoch));
1044 self.committed_epoch = Some(prev_epoch);
1045 partial_graph_manager.ack_completed(self.partial_graph_id, prev_epoch);
1046 } else {
1047 assert_eq!(command_prev_epoch, None);
1048 };
1049 for (job_id, epoch) in independent_job_epochs {
1050 if let Some(job) = self.independent_checkpoint_job_controls.get_mut(&job_id) {
1051 job.ack_completed(partial_graph_manager, epoch);
1052 }
1053 }
1056 }
1057 }
1058
1059 fn handle_refresh_table_info(
1060 &self,
1061 task: &mut Option<CompleteBarrierTask>,
1062 resps: &HashMap<WorkerId, BarrierCompleteResponse>,
1063 ) {
1064 let list_finished_info = resps
1065 .values()
1066 .flat_map(|resp| resp.list_finished_sources.clone())
1067 .collect::<Vec<_>>();
1068 if !list_finished_info.is_empty() {
1069 let task = task.get_or_insert_default();
1070 task.list_finished_source_ids.extend(list_finished_info);
1071 }
1072
1073 let load_finished_info = resps
1074 .values()
1075 .flat_map(|resp| resp.load_finished_sources.clone())
1076 .collect::<Vec<_>>();
1077 if !load_finished_info.is_empty() {
1078 let task = task.get_or_insert_default();
1079 task.load_finished_source_ids.extend(load_finished_info);
1080 }
1081
1082 let refresh_finished_table_ids: Vec<JobId> = resps
1083 .values()
1084 .flat_map(|resp| {
1085 resp.refresh_finished_tables
1086 .iter()
1087 .map(|table_id| table_id.as_job_id())
1088 })
1089 .collect::<Vec<_>>();
1090 if !refresh_finished_table_ids.is_empty() {
1091 let task = task.get_or_insert_default();
1092 task.refresh_finished_table_job_ids
1093 .extend(refresh_finished_table_ids);
1094 }
1095 }
1096}
1097
1098impl DatabaseCheckpointControl {
1099 fn handle_new_barrier(
1101 &mut self,
1102 command: Option<(Command, Vec<Notifier>)>,
1103 checkpoint: bool,
1104 span: tracing::Span,
1105 partial_graph_manager: &mut PartialGraphManager,
1106 hummock_version_stats: &HummockVersionStats,
1107 worker_nodes: &HashMap<WorkerId, WorkerNode>,
1108 ) -> MetaResult<()> {
1109 let curr_epoch = self.state.in_flight_prev_epoch().next();
1110
1111 let (command, mut notifiers) = if let Some((command, notifiers)) = command {
1112 (Some(command), notifiers)
1113 } else {
1114 (None, vec![])
1115 };
1116
1117 debug_assert!(
1118 !matches!(
1119 &command,
1120 Some(Command::RescheduleIntent {
1121 reschedule_plan: None,
1122 ..
1123 })
1124 ),
1125 "reschedule intent should be resolved before injection"
1126 );
1127
1128 if let Some(Command::DropStreamingJobs {
1129 streaming_job_ids, ..
1130 }) = &command
1131 {
1132 if streaming_job_ids.len() > 1 {
1133 for job_to_cancel in streaming_job_ids {
1134 if self
1135 .independent_checkpoint_job_controls
1136 .contains_key(job_to_cancel)
1137 {
1138 warn!(
1139 job_id = %job_to_cancel,
1140 "ignore multi-job cancel command on creating snapshot backfill streaming job"
1141 );
1142 for notifier in notifiers {
1143 notifier
1144 .notify_start_failed(anyhow!("cannot cancel creating snapshot backfill streaming job with other jobs, \
1145 the job will continue creating until created or recovery. Please cancel the snapshot backfilling job in a single DDL ").into());
1146 }
1147 return Ok(());
1148 }
1149 }
1150 } else if let Some(job_to_drop) = streaming_job_ids.iter().next()
1151 && let Some(job) = self
1152 .independent_checkpoint_job_controls
1153 .get_mut(job_to_drop)
1154 {
1155 let dropped = job.drop(&mut notifiers, partial_graph_manager);
1156 if dropped {
1157 return Ok(());
1158 }
1159 }
1160 }
1161
1162 if let Some(Command::Throttle { jobs, .. }) = &command
1163 && jobs.len() > 1
1164 && let Some(independent_job_id) = jobs
1165 .iter()
1166 .find(|job| self.independent_checkpoint_job_controls.contains_key(*job))
1167 {
1168 warn!(
1169 job_id = %independent_job_id,
1170 "ignore multi-job throttle command on independent checkpoint job"
1171 );
1172 for notifier in notifiers {
1173 notifier.notify_start_failed(
1174 anyhow!(
1175 "cannot alter rate limit for independent checkpoint job with other jobs, \
1176 the original rate limit will be kept during recovery."
1177 )
1178 .into(),
1179 );
1180 }
1181 return Ok(());
1182 };
1183
1184 if let Some(Command::RescheduleIntent {
1185 reschedule_plan: Some(reschedule_plan),
1186 ..
1187 }) = &command
1188 && !self.independent_checkpoint_job_controls.is_empty()
1189 {
1190 let blocked_job_ids =
1191 self.collect_reschedule_blocked_jobs_for_independent_jobs_inflight()?;
1192 let blocked_reschedule_job_ids = self.collect_reschedule_blocked_job_ids(
1193 &reschedule_plan.reschedules,
1194 &reschedule_plan.fragment_actors,
1195 &blocked_job_ids,
1196 );
1197 if !blocked_reschedule_job_ids.is_empty() {
1198 warn!(
1199 blocked_reschedule_job_ids = ?blocked_reschedule_job_ids,
1200 "reject reschedule fragments related to creating unreschedulable backfill jobs"
1201 );
1202 for notifier in notifiers {
1203 notifier.notify_start_failed(
1204 anyhow!(
1205 "cannot reschedule jobs {:?} when creating jobs with unreschedulable backfill fragments",
1206 blocked_reschedule_job_ids
1207 )
1208 .into(),
1209 );
1210 }
1211 return Ok(());
1212 }
1213 }
1214
1215 if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1216 && self.database_info.is_empty()
1217 {
1218 assert!(
1219 self.independent_checkpoint_job_controls.is_empty(),
1220 "should not have snapshot backfill job when there is no normal job in database"
1221 );
1222 for mut notifier in notifiers {
1224 notifier.notify_started();
1225 notifier.notify_collected();
1226 }
1227 return Ok(());
1228 };
1229
1230 if let Some(Command::CreateStreamingJob {
1231 job_type:
1232 CreateStreamingJobType::SnapshotBackfill(_) | CreateStreamingJobType::BatchRefresh(_),
1233 ..
1234 }) = &command
1235 && self.state.is_paused()
1236 {
1237 warn!("cannot create streaming job with snapshot backfill when paused");
1238 for notifier in notifiers {
1239 notifier.notify_start_failed(
1240 anyhow!("cannot create streaming job with snapshot backfill when paused",)
1241 .into(),
1242 );
1243 }
1244 return Ok(());
1245 }
1246
1247 let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1248 barrier_info.prev_epoch.span().in_scope(|| {
1250 tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch(), "new barrier enqueued");
1251 });
1252 span.record("epoch", barrier_info.curr_epoch());
1253
1254 let epoch = barrier_info.epoch();
1255 let ApplyCommandInfo { jobs_to_wait } = match self.apply_command(
1256 command,
1257 &mut notifiers,
1258 barrier_info,
1259 partial_graph_manager,
1260 hummock_version_stats,
1261 worker_nodes,
1262 ) {
1263 Ok(info) => {
1264 assert!(notifiers.is_empty());
1265 info
1266 }
1267 Err(err) => {
1268 for notifier in notifiers {
1269 notifier.notify_start_failed(err.clone());
1270 }
1271 fail_point!("inject_barrier_err_success");
1272 return Err(err);
1273 }
1274 };
1275
1276 self.enqueue_command(epoch, jobs_to_wait);
1278
1279 Ok(())
1280 }
1281}