1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::ops::Bound::{Excluded, Unbounded};
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use itertools::Itertools;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_common::util::epoch::EpochPair;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
29use risingwave_meta_model::WorkerId;
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::id::{FragmentId, PartialGraphId};
33use risingwave_pb::stream_plan::DispatcherType as PbDispatcherType;
34use risingwave_pb::stream_plan::stream_node::NodeBody;
35use risingwave_pb::stream_service::BarrierCompleteResponse;
36use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
37use tracing::{debug, warn};
38
39use crate::barrier::cdc_progress::CdcProgress;
40use crate::barrier::checkpoint::independent_job::IndependentCheckpointJobControl;
41use crate::barrier::checkpoint::recovery::{
42 DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
43 RecoveringStateAction,
44};
45use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
46use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
47use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
48use crate::barrier::notifier::Notifier;
49use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager, PartialGraphStat};
50use crate::barrier::progress::TrackingJob;
51use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
52use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
53use crate::barrier::utils::{BarrierItemCollector, collect_independent_job_commit_epoch_info};
54use crate::barrier::{
55 BackfillProgress, Command, CreateStreamingJobType, FragmentBackfillProgress, Reschedule,
56};
57use crate::controller::fragment::InflightFragmentInfo;
58use crate::controller::scale::{build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs};
59use crate::manager::MetaSrvEnv;
60
61fn fragment_has_online_unreschedulable_scan(fragment: &InflightFragmentInfo) -> bool {
62 let mut has_unreschedulable_scan = false;
63 visit_stream_node_cont(&fragment.nodes, |node| {
64 if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
65 let scan_type = stream_scan.stream_scan_type();
66 if !scan_type.is_reschedulable(true) {
67 has_unreschedulable_scan = true;
68 return false;
69 }
70 }
71 true
72 });
73 has_unreschedulable_scan
74}
75
76fn collect_fragment_upstream_fragment_ids(
77 fragment: &InflightFragmentInfo,
78 upstream_fragment_ids: &mut HashSet<FragmentId>,
79) {
80 visit_stream_node_cont(&fragment.nodes, |node| {
81 if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref() {
82 upstream_fragment_ids.insert(merge.upstream_fragment_id);
83 }
84 true
85 });
86}
87
88use crate::model::ActorId;
89use crate::rpc::metrics::GLOBAL_META_METRICS;
90use crate::{MetaError, MetaResult};
91
92pub(crate) struct CheckpointControl {
93 pub(crate) env: MetaSrvEnv,
94 pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
95 pub(super) hummock_version_stats: HummockVersionStats,
96 pub(crate) in_flight_barrier_nums: usize,
98}
99
100impl CheckpointControl {
101 pub fn new(env: MetaSrvEnv) -> Self {
102 Self {
103 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
104 env,
105 databases: Default::default(),
106 hummock_version_stats: Default::default(),
107 }
108 }
109
110 pub(crate) fn recover(
111 databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
112 failed_databases: HashMap<DatabaseId, HashSet<PartialGraphId>>, hummock_version_stats: HummockVersionStats,
114 env: MetaSrvEnv,
115 ) -> Self {
116 env.shared_actor_infos()
117 .retain_databases(databases.keys().chain(failed_databases.keys()).cloned());
118 Self {
119 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
120 env,
121 databases: databases
122 .into_iter()
123 .map(|(database_id, control)| {
124 (
125 database_id,
126 DatabaseCheckpointControlStatus::Running(control),
127 )
128 })
129 .chain(failed_databases.into_iter().map(
130 |(database_id, resetting_partial_graphs)| {
131 (
132 database_id,
133 DatabaseCheckpointControlStatus::Recovering(
134 DatabaseRecoveringState::new_resetting(
135 database_id,
136 resetting_partial_graphs,
137 ),
138 ),
139 )
140 },
141 ))
142 .collect(),
143 hummock_version_stats,
144 }
145 }
146
147 pub(crate) fn ack_completed(
148 &mut self,
149 partial_graph_manager: &mut PartialGraphManager,
150 output: BarrierCompleteOutput,
151 ) {
152 self.hummock_version_stats = output.hummock_version_stats;
153 for (database_id, (command_prev_epoch, independent_job_epochs)) in output.epochs_to_ack {
154 self.databases
155 .get_mut(&database_id)
156 .expect("should exist")
157 .expect_running("should have wait for completing command before enter recovery")
158 .ack_completed(
159 partial_graph_manager,
160 command_prev_epoch,
161 independent_job_epochs,
162 );
163 }
164 }
165
166 pub(crate) fn next_complete_barrier_task(
167 &mut self,
168 periodic_barriers: &mut PeriodicBarriers,
169 partial_graph_manager: &mut PartialGraphManager,
170 ) -> Option<CompleteBarrierTask> {
171 let mut task = None;
172 for database in self.databases.values_mut() {
173 let Some(database) = database.running_state_mut() else {
174 continue;
175 };
176 database.next_complete_barrier_task(
177 periodic_barriers,
178 partial_graph_manager,
179 &mut task,
180 &self.hummock_version_stats,
181 );
182 }
183 task
184 }
185
186 pub(crate) fn barrier_collected(
187 &mut self,
188 partial_graph_id: PartialGraphId,
189 collected_barrier: CollectedBarrier<'_>,
190 periodic_barriers: &mut PeriodicBarriers,
191 ) -> MetaResult<()> {
192 let (database_id, _) = from_partial_graph_id(partial_graph_id);
193 let database_status = self.databases.get_mut(&database_id).expect("should exist");
194 match database_status {
195 DatabaseCheckpointControlStatus::Running(database) => {
196 database.barrier_collected(partial_graph_id, collected_barrier, periodic_barriers)
197 }
198 DatabaseCheckpointControlStatus::Recovering(_) => {
199 if cfg!(debug_assertions) {
200 panic!(
201 "receive collected barrier {:?} on recovering database {} from partial graph {}",
202 collected_barrier, database_id, partial_graph_id
203 );
204 } else {
205 warn!(?collected_barrier, %partial_graph_id, "ignore collected barrier on recovering database");
206 }
207 Ok(())
208 }
209 }
210 }
211
212 pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
213 self.databases.iter().filter_map(|(database_id, database)| {
214 database.running_state().is_none().then_some(*database_id)
215 })
216 }
217
218 pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
219 self.databases.iter().filter_map(|(database_id, database)| {
220 database.running_state().is_some().then_some(*database_id)
221 })
222 }
223
224 pub(crate) fn database_info(&self, database_id: DatabaseId) -> Option<&InflightDatabaseInfo> {
225 self.databases
226 .get(&database_id)
227 .and_then(|database| database.running_state())
228 .map(|database| &database.database_info)
229 }
230
231 pub(crate) fn may_have_snapshot_backfilling_jobs(&self) -> bool {
232 self.databases
233 .values()
234 .any(|database| database.may_have_snapshot_backfilling_jobs())
235 }
236
237 pub(crate) fn handle_new_barrier(
239 &mut self,
240 new_barrier: NewBarrier,
241 partial_graph_manager: &mut PartialGraphManager,
242 worker_nodes: &HashMap<WorkerId, WorkerNode>,
243 ) -> MetaResult<()> {
244 let NewBarrier {
245 database_id,
246 command,
247 span,
248 checkpoint,
249 } = new_barrier;
250
251 if let Some((mut command, notifiers)) = command {
252 if let &mut Command::CreateStreamingJob {
253 ref mut cross_db_snapshot_backfill_info,
254 ref info,
255 ..
256 } = &mut command
257 {
258 for (table_id, snapshot_epoch) in
259 &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
260 {
261 for database in self.databases.values() {
262 if let Some(database) = database.running_state()
263 && database.database_info.contains_job(table_id.as_job_id())
264 {
265 if let Some(committed_epoch) = database.committed_epoch {
266 *snapshot_epoch = Some(committed_epoch);
267 }
268 break;
269 }
270 }
271 if snapshot_epoch.is_none() {
272 let table_id = *table_id;
273 warn!(
274 ?cross_db_snapshot_backfill_info,
275 ?table_id,
276 ?info,
277 "database of cross db upstream table not found"
278 );
279 let err: MetaError =
280 anyhow!("database of cross db upstream table {} not found", table_id)
281 .into();
282 for notifier in notifiers {
283 notifier.notify_start_failed(err.clone());
284 }
285
286 return Ok(());
287 }
288 }
289 }
290
291 let database = match self.databases.entry(database_id) {
292 Entry::Occupied(entry) => entry
293 .into_mut()
294 .expect_running("should not have command when not running"),
295 Entry::Vacant(entry) => match &command {
296 Command::CreateStreamingJob { info, job_type, .. } => {
297 let CreateStreamingJobType::Normal = job_type else {
298 if cfg!(debug_assertions) {
299 panic!(
300 "unexpected first job of type {job_type:?} with info {info:?}"
301 );
302 } else {
303 for notifier in notifiers {
304 notifier.notify_start_failed(anyhow!("unexpected job_type {job_type:?} for first job {} in database {database_id}", info.streaming_job.id()).into());
305 }
306 return Ok(());
307 }
308 };
309 let new_database = DatabaseCheckpointControl::new(
310 database_id,
311 self.env.shared_actor_infos().clone(),
312 );
313 let adder = partial_graph_manager.add_partial_graph(
314 to_partial_graph_id(database_id, None),
315 DatabaseCheckpointControlMetrics::new(database_id),
316 );
317 adder.added();
318 entry
319 .insert(DatabaseCheckpointControlStatus::Running(new_database))
320 .expect_running("just initialized as running")
321 }
322 Command::Flush
323 | Command::Pause
324 | Command::Resume
325 | Command::DropStreamingJobs { .. }
326 | Command::DropSubscription { .. } => {
327 for mut notifier in notifiers {
328 notifier.notify_started();
329 notifier.notify_collected();
330 }
331 warn!(?command, "skip command for empty database");
332 return Ok(());
333 }
334 Command::RescheduleIntent { .. }
335 | Command::ReplaceStreamJob(_)
336 | Command::SourceChangeSplit(_)
337 | Command::Throttle { .. }
338 | Command::CreateSubscription { .. }
339 | Command::AlterSubscriptionRetention { .. }
340 | Command::ConnectorPropsChange(_)
341 | Command::Refresh { .. }
342 | Command::ListFinish { .. }
343 | Command::LoadFinish { .. }
344 | Command::ResetSource { .. }
345 | Command::ResumeBackfill { .. }
346 | Command::InjectSourceOffsets { .. } => {
347 if cfg!(debug_assertions) {
348 panic!(
349 "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
350 database_id, command
351 )
352 } else {
353 warn!(%database_id, ?command, "database not exist when handling command");
354 for notifier in notifiers {
355 notifier.notify_start_failed(anyhow!("database {database_id} not exist when handling command {command:?}").into());
356 }
357 return Ok(());
358 }
359 }
360 },
361 };
362
363 database.handle_new_barrier(
364 Some((command, notifiers)),
365 checkpoint,
366 span,
367 partial_graph_manager,
368 &self.hummock_version_stats,
369 worker_nodes,
370 )
371 } else {
372 let database = match self.databases.entry(database_id) {
373 Entry::Occupied(entry) => entry.into_mut(),
374 Entry::Vacant(_) => {
375 return Ok(());
378 }
379 };
380 let Some(database) = database.running_state_mut() else {
381 return Ok(());
383 };
384 if partial_graph_manager.inflight_barrier_num(database.partial_graph_id)
385 >= self.in_flight_barrier_nums
386 {
387 return Ok(());
389 }
390 database.handle_new_barrier(
391 None,
392 checkpoint,
393 span,
394 partial_graph_manager,
395 &self.hummock_version_stats,
396 worker_nodes,
397 )
398 }
399 }
400
401 pub(crate) fn gen_backfill_progress(&self) -> HashMap<JobId, BackfillProgress> {
402 let mut progress = HashMap::new();
403 for status in self.databases.values() {
404 let Some(database_checkpoint_control) = status.running_state() else {
405 continue;
406 };
407 progress.extend(
409 database_checkpoint_control
410 .database_info
411 .gen_backfill_progress(),
412 );
413 for (job_id, job) in &database_checkpoint_control.independent_checkpoint_job_controls {
415 progress.extend([(*job_id, job.gen_backfill_progress())]);
416 }
417 }
418 progress
419 }
420
421 pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
422 let mut progress = Vec::new();
423 for status in self.databases.values() {
424 let Some(database_checkpoint_control) = status.running_state() else {
425 continue;
426 };
427 progress.extend(
428 database_checkpoint_control
429 .database_info
430 .gen_fragment_backfill_progress(),
431 );
432 for job in database_checkpoint_control
433 .independent_checkpoint_job_controls
434 .values()
435 {
436 progress.extend(job.gen_fragment_backfill_progress());
437 }
438 }
439 progress
440 }
441
442 pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
443 let mut progress = HashMap::new();
444 for status in self.databases.values() {
445 let Some(database_checkpoint_control) = status.running_state() else {
446 continue;
447 };
448 progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
450 }
451 progress
452 }
453
454 pub(crate) fn databases_failed_at_worker_err(
455 &mut self,
456 worker_id: WorkerId,
457 ) -> impl Iterator<Item = DatabaseId> + '_ {
458 self.databases
459 .iter_mut()
460 .filter_map(
461 move |(database_id, database_status)| match database_status {
462 DatabaseCheckpointControlStatus::Running(control) => {
463 if !control.is_valid_after_worker_err(worker_id) {
464 Some(*database_id)
465 } else {
466 None
467 }
468 }
469 DatabaseCheckpointControlStatus::Recovering(state) => {
470 if !state.is_valid_after_worker_err(worker_id) {
471 Some(*database_id)
472 } else {
473 None
474 }
475 }
476 },
477 )
478 }
479}
480
481pub(crate) enum CheckpointControlEvent<'a> {
482 EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
483 EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
484}
485
486impl CheckpointControl {
487 pub(crate) fn on_partial_graph_reset(
488 &mut self,
489 partial_graph_id: PartialGraphId,
490 reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
491 ) {
492 let (database_id, independent_job_id) = from_partial_graph_id(partial_graph_id);
493 match self.databases.get_mut(&database_id).expect("should exist") {
494 DatabaseCheckpointControlStatus::Running(database) => {
495 if let Some(independent_job_id) = independent_job_id {
496 match database
497 .independent_checkpoint_job_controls
498 .remove(&independent_job_id)
499 {
500 Some(independent_job) => {
501 independent_job.on_partial_graph_reset();
502 }
503 None => {
504 if cfg!(debug_assertions) {
505 panic!(
506 "receive reset partial graph resp on non-existing independent job {independent_job_id} in database {database_id}"
507 )
508 }
509 warn!(
510 %database_id,
511 %independent_job_id,
512 "ignore reset partial graph resp on non-existing independent job on running database"
513 );
514 }
515 }
516 } else {
517 unreachable!("should not receive reset database resp when database running")
518 }
519 }
520 DatabaseCheckpointControlStatus::Recovering(state) => {
521 state.on_partial_graph_reset(partial_graph_id, reset_resps);
522 }
523 }
524 }
525
526 pub(crate) fn on_partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
527 let (database_id, _) = from_partial_graph_id(partial_graph_id);
528 match self.databases.get_mut(&database_id).expect("should exist") {
529 DatabaseCheckpointControlStatus::Running(_) => {
530 unreachable!("should not have partial graph initialized when running")
531 }
532 DatabaseCheckpointControlStatus::Recovering(state) => {
533 state.partial_graph_initialized(partial_graph_id);
534 }
535 }
536 }
537
538 pub(crate) fn next_event(
539 &mut self,
540 ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
541 let mut this = Some(self);
542 poll_fn(move |cx| {
543 let Some(this_mut) = this.as_mut() else {
544 unreachable!("should not be polled after poll ready")
545 };
546 for (&database_id, database_status) in &mut this_mut.databases {
547 match database_status {
548 DatabaseCheckpointControlStatus::Running(_) => {}
549 DatabaseCheckpointControlStatus::Recovering(state) => {
550 let poll_result = state.poll_next_event(cx);
551 if let Poll::Ready(action) = poll_result {
552 let this = this.take().expect("checked Some");
553 return Poll::Ready(match action {
554 RecoveringStateAction::EnterInitializing(reset_workers) => {
555 CheckpointControlEvent::EnteringInitializing(
556 this.new_database_status_action(
557 database_id,
558 EnterInitializing(reset_workers),
559 ),
560 )
561 }
562 RecoveringStateAction::EnterRunning => {
563 CheckpointControlEvent::EnteringRunning(
564 this.new_database_status_action(database_id, EnterRunning),
565 )
566 }
567 });
568 }
569 }
570 }
571 }
572 Poll::Pending
573 })
574 }
575}
576
577pub(crate) enum DatabaseCheckpointControlStatus {
578 Running(DatabaseCheckpointControl),
579 Recovering(DatabaseRecoveringState),
580}
581
582impl DatabaseCheckpointControlStatus {
583 fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
584 match self {
585 DatabaseCheckpointControlStatus::Running(state) => Some(state),
586 DatabaseCheckpointControlStatus::Recovering(_) => None,
587 }
588 }
589
590 fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
591 match self {
592 DatabaseCheckpointControlStatus::Running(state) => Some(state),
593 DatabaseCheckpointControlStatus::Recovering(_) => None,
594 }
595 }
596
597 fn may_have_snapshot_backfilling_jobs(&self) -> bool {
598 self.running_state()
599 .map(|database| {
600 database
601 .independent_checkpoint_job_controls
602 .values()
603 .any(|job| job.is_snapshot_backfilling())
604 })
605 .unwrap_or(true) }
607
608 fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
609 match self {
610 DatabaseCheckpointControlStatus::Running(state) => state,
611 DatabaseCheckpointControlStatus::Recovering(_) => {
612 panic!("should be at running: {}", reason)
613 }
614 }
615 }
616}
617
618pub(in crate::barrier) struct DatabaseCheckpointControlMetrics {
619 barrier_latency: LabelGuardedHistogram,
620 in_flight_barrier_nums: LabelGuardedIntGauge,
621 all_barrier_nums: LabelGuardedIntGauge,
622}
623
624impl DatabaseCheckpointControlMetrics {
625 pub(in crate::barrier) fn new(database_id: DatabaseId) -> Self {
626 let database_id_str = database_id.to_string();
627 let barrier_latency = GLOBAL_META_METRICS
628 .barrier_latency
629 .with_guarded_label_values(&[&database_id_str]);
630 let in_flight_barrier_nums = GLOBAL_META_METRICS
631 .in_flight_barrier_nums
632 .with_guarded_label_values(&[&database_id_str]);
633 let all_barrier_nums = GLOBAL_META_METRICS
634 .all_barrier_nums
635 .with_guarded_label_values(&[&database_id_str]);
636 Self {
637 barrier_latency,
638 in_flight_barrier_nums,
639 all_barrier_nums,
640 }
641 }
642}
643
644impl PartialGraphStat for DatabaseCheckpointControlMetrics {
645 fn observe_barrier_latency(&self, _epoch: EpochPair, barrier_latency_secs: f64) {
646 self.barrier_latency.observe(barrier_latency_secs);
647 }
648
649 fn observe_barrier_num(&self, inflight_barrier_num: usize, collected_barrier_num: usize) {
650 self.in_flight_barrier_nums.set(inflight_barrier_num as _);
651 self.all_barrier_nums
652 .set((inflight_barrier_num + collected_barrier_num) as _);
653 }
654}
655
656pub(in crate::barrier) struct DatabaseCheckpointControl {
658 pub(super) database_id: DatabaseId,
659 partial_graph_id: PartialGraphId,
660 pub(super) state: BarrierWorkerState,
661
662 finishing_jobs_collector:
663 BarrierItemCollector<JobId, (Vec<BarrierCompleteResponse>, TrackingJob), ()>,
664 completing_barrier: Option<u64>,
667
668 committed_epoch: Option<u64>,
669
670 pub(super) database_info: InflightDatabaseInfo,
671 pub independent_checkpoint_job_controls: HashMap<JobId, IndependentCheckpointJobControl>,
672}
673
674impl DatabaseCheckpointControl {
675 fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
676 Self {
677 database_id,
678 partial_graph_id: to_partial_graph_id(database_id, None),
679 state: BarrierWorkerState::new(),
680 finishing_jobs_collector: BarrierItemCollector::new(),
681 completing_barrier: None,
682 committed_epoch: None,
683 database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
684 independent_checkpoint_job_controls: Default::default(),
685 }
686 }
687
688 pub(crate) fn recovery(
689 database_id: DatabaseId,
690 state: BarrierWorkerState,
691 committed_epoch: u64,
692 database_info: InflightDatabaseInfo,
693 independent_checkpoint_job_controls: HashMap<JobId, IndependentCheckpointJobControl>,
694 ) -> Self {
695 Self {
696 database_id,
697 partial_graph_id: to_partial_graph_id(database_id, None),
698 state,
699 finishing_jobs_collector: BarrierItemCollector::new(),
700 completing_barrier: None,
701 committed_epoch: Some(committed_epoch),
702 database_info,
703 independent_checkpoint_job_controls,
704 }
705 }
706
707 pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
708 !self.database_info.contains_worker(worker_id as _)
709 && self
710 .independent_checkpoint_job_controls
711 .values()
712 .all(|job| {
713 job.fragment_infos()
714 .map(|fragment_infos| {
715 !InflightFragmentInfo::contains_worker(
716 fragment_infos.values(),
717 worker_id,
718 )
719 })
720 .unwrap_or(true)
721 })
722 }
723
724 fn enqueue_command(&mut self, epoch: EpochPair, independent_jobs_to_wait: HashSet<JobId>) {
726 let prev_epoch = epoch.prev;
727 tracing::trace!(prev_epoch, ?independent_jobs_to_wait, "enqueue command");
728 if !independent_jobs_to_wait.is_empty() {
729 self.finishing_jobs_collector
730 .enqueue(epoch, independent_jobs_to_wait, ());
731 }
732 }
733
734 fn barrier_collected(
737 &mut self,
738 partial_graph_id: PartialGraphId,
739 collected_barrier: CollectedBarrier<'_>,
740 periodic_barriers: &mut PeriodicBarriers,
741 ) -> MetaResult<()> {
742 let prev_epoch = collected_barrier.epoch.prev;
743 tracing::trace!(
744 prev_epoch,
745 partial_graph_id = %partial_graph_id,
746 "barrier collected"
747 );
748 let (database_id, independent_job_id) = from_partial_graph_id(partial_graph_id);
749 assert_eq!(self.database_id, database_id);
750 if let Some(independent_job_id) = independent_job_id {
751 let job = self
752 .independent_checkpoint_job_controls
753 .get_mut(&independent_job_id)
754 .expect("should exist");
755 let should_force_checkpoint = job.collect(collected_barrier);
756 if should_force_checkpoint {
757 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
758 }
759 }
760 Ok(())
761 }
762}
763
764impl DatabaseCheckpointControl {
765 fn collect_backfill_pinned_upstream_log_epoch(
767 &self,
768 ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
769 self.independent_checkpoint_job_controls
770 .iter()
771 .map(|(job_id, job)| (*job_id, job.pinned_upstream_log_epoch()))
772 .collect()
773 }
774
775 fn collect_no_shuffle_fragment_relations_for_reschedule_check(
776 &self,
777 ) -> Vec<(FragmentId, FragmentId)> {
778 let mut no_shuffle_relations = Vec::new();
779 for fragment in self.database_info.fragment_infos() {
780 let downstream_fragment_id = fragment.fragment_id;
781 visit_stream_node_cont(&fragment.nodes, |node| {
782 if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
783 && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
784 {
785 no_shuffle_relations.push((merge.upstream_fragment_id, downstream_fragment_id));
786 }
787 true
788 });
789 }
790
791 for job in self.independent_checkpoint_job_controls.values() {
792 if let Some(fragment_infos) = job.fragment_infos() {
793 for fragment_info in fragment_infos.values() {
794 let downstream_fragment_id = fragment_info.fragment_id;
795 visit_stream_node_cont(&fragment_info.nodes, |node| {
796 if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
797 && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
798 {
799 no_shuffle_relations
800 .push((merge.upstream_fragment_id, downstream_fragment_id));
801 }
802 true
803 });
804 }
805 }
806 }
807 no_shuffle_relations
808 }
809
810 fn collect_reschedule_blocked_jobs_for_independent_jobs_inflight(
811 &self,
812 ) -> MetaResult<HashSet<JobId>> {
813 let mut initial_blocked_fragment_ids = HashSet::new();
814 for job in self.independent_checkpoint_job_controls.values() {
815 if let Some(fragment_infos) = job.fragment_infos() {
816 for fragment_info in fragment_infos.values() {
817 if fragment_has_online_unreschedulable_scan(fragment_info) {
818 initial_blocked_fragment_ids.insert(fragment_info.fragment_id);
819 collect_fragment_upstream_fragment_ids(
820 fragment_info,
821 &mut initial_blocked_fragment_ids,
822 );
823 }
824 }
825 }
826 }
827
828 let mut blocked_fragment_ids = initial_blocked_fragment_ids.clone();
829 if !initial_blocked_fragment_ids.is_empty() {
830 let no_shuffle_relations =
831 self.collect_no_shuffle_fragment_relations_for_reschedule_check();
832 let (forward_edges, backward_edges) =
833 build_no_shuffle_fragment_graph_edges(no_shuffle_relations);
834 let initial_blocked_fragment_ids: Vec<_> =
835 initial_blocked_fragment_ids.iter().copied().collect();
836 for ensemble in find_no_shuffle_graphs(
837 &initial_blocked_fragment_ids,
838 &forward_edges,
839 &backward_edges,
840 )? {
841 blocked_fragment_ids.extend(ensemble.fragments());
842 }
843 }
844
845 let mut blocked_job_ids = HashSet::new();
846 blocked_job_ids.extend(
847 blocked_fragment_ids
848 .into_iter()
849 .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id)),
850 );
851 Ok(blocked_job_ids)
852 }
853
854 fn collect_reschedule_blocked_job_ids(
855 &self,
856 reschedules: &HashMap<FragmentId, Reschedule>,
857 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
858 blocked_job_ids: &HashSet<JobId>,
859 ) -> HashSet<JobId> {
860 let mut affected_fragment_ids: HashSet<FragmentId> = reschedules.keys().copied().collect();
861 affected_fragment_ids.extend(fragment_actors.keys().copied());
862 for reschedule in reschedules.values() {
863 affected_fragment_ids.extend(reschedule.downstream_fragment_ids.iter().copied());
864 affected_fragment_ids.extend(
865 reschedule
866 .upstream_fragment_dispatcher_ids
867 .iter()
868 .map(|(fragment_id, _)| *fragment_id),
869 );
870 }
871
872 affected_fragment_ids
873 .into_iter()
874 .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id))
875 .filter(|job_id| blocked_job_ids.contains(job_id))
876 .collect()
877 }
878
879 fn next_complete_barrier_task(
880 &mut self,
881 periodic_barriers: &mut PeriodicBarriers,
882 partial_graph_manager: &mut PartialGraphManager,
883 task: &mut Option<CompleteBarrierTask>,
884 hummock_version_stats: &HummockVersionStats,
885 ) {
886 let mut independent_jobs_task = vec![];
888 if let Some(committed_epoch) = self.committed_epoch {
889 let mut finished_jobs = Vec::new();
891 let min_upstream_inflight_barrier = partial_graph_manager
892 .first_inflight_barrier(self.partial_graph_id)
893 .map(|epoch| epoch.prev);
894 for (job_id, job) in &mut self.independent_checkpoint_job_controls {
895 let IndependentCheckpointJobControl::CreatingStreamingJob(job) = job;
896 if let Some((epoch, resps, info, is_finish_epoch)) = job.start_completing(
897 partial_graph_manager,
898 min_upstream_inflight_barrier,
899 committed_epoch,
900 ) {
901 let resps = resps.into_values().collect_vec();
902 if is_finish_epoch {
903 assert!(info.notifiers.is_empty());
904 finished_jobs.push((*job_id, epoch, resps));
905 continue;
906 };
907 independent_jobs_task.push((*job_id, epoch, resps, info));
908 }
909 }
910
911 if !finished_jobs.is_empty() {
912 partial_graph_manager.remove_partial_graphs(
913 finished_jobs
914 .iter()
915 .map(|(job_id, ..)| to_partial_graph_id(self.database_id, Some(*job_id)))
916 .collect(),
917 );
918 }
919 for (job_id, epoch, resps) in finished_jobs {
920 debug!(epoch, %job_id, "finish creating job");
921 let IndependentCheckpointJobControl::CreatingStreamingJob(creating_streaming_job) =
924 self.independent_checkpoint_job_controls
925 .remove(&job_id)
926 .expect("should exist");
927 let tracking_job = creating_streaming_job.into_tracking_job();
928 self.finishing_jobs_collector
929 .collect(epoch, job_id, (resps, tracking_job));
930 }
931 }
932 let mut observed_non_checkpoint = false;
933 self.finishing_jobs_collector.advance_collected();
934 let epoch_end_bound = self
935 .finishing_jobs_collector
936 .first_inflight_epoch()
937 .map_or(Unbounded, |epoch| Excluded(epoch.prev));
938 if let Some((epoch, resps, info)) = partial_graph_manager.start_completing(
939 self.partial_graph_id,
940 epoch_end_bound,
941 |_, resps, post_collect_command| {
942 observed_non_checkpoint = true;
943 self.handle_refresh_table_info(task, &resps);
944 self.database_info.apply_collected_command(
945 &post_collect_command,
946 &resps,
947 hummock_version_stats,
948 );
949 },
950 ) {
951 self.handle_refresh_table_info(task, &resps);
952 self.database_info.apply_collected_command(
953 &info.post_collect_command,
954 &resps,
955 hummock_version_stats,
956 );
957 let mut resps_to_commit = resps.into_values().collect_vec();
958 let mut staging_commit_info = self.database_info.take_staging_commit_info();
959 if let Some((_, finished_jobs, _)) =
960 self.finishing_jobs_collector
961 .take_collected_if(|collected_epoch| {
962 assert!(epoch <= collected_epoch.prev);
963 epoch == collected_epoch.prev
964 })
965 {
966 finished_jobs
967 .into_iter()
968 .for_each(|(_, (resps, tracking_job))| {
969 resps_to_commit.extend(resps);
970 staging_commit_info.finished_jobs.push(tracking_job);
971 });
972 }
973 {
974 let task = task.get_or_insert_default();
975 Command::collect_commit_epoch_info(
976 &self.database_info,
977 &info,
978 &mut task.commit_info,
979 resps_to_commit,
980 self.collect_backfill_pinned_upstream_log_epoch(),
981 );
982 self.completing_barrier = Some(info.barrier_info.prev_epoch());
983 task.finished_jobs.extend(staging_commit_info.finished_jobs);
984 task.finished_cdc_table_backfill
985 .extend(staging_commit_info.finished_cdc_table_backfill);
986 task.epoch_infos
987 .try_insert(self.partial_graph_id, info)
988 .expect("non duplicate");
989 task.commit_info
990 .truncate_tables
991 .extend(staging_commit_info.table_ids_to_truncate);
992 }
993 } else if observed_non_checkpoint
994 && self.database_info.has_pending_finished_jobs()
995 && !partial_graph_manager.has_pending_checkpoint_barrier(self.partial_graph_id)
996 {
997 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
998 }
999 if !independent_jobs_task.is_empty() {
1000 let task = task.get_or_insert_default();
1001 for (job_id, epoch, resps, info) in independent_jobs_task {
1002 collect_independent_job_commit_epoch_info(
1003 &mut task.commit_info,
1004 epoch,
1005 resps,
1006 &info,
1007 );
1008 task.epoch_infos
1009 .try_insert(to_partial_graph_id(self.database_id, Some(job_id)), info)
1010 .expect("non duplicate");
1011 }
1012 }
1013 }
1014
1015 fn ack_completed(
1016 &mut self,
1017 partial_graph_manager: &mut PartialGraphManager,
1018 command_prev_epoch: Option<u64>,
1019 independent_job_epochs: Vec<(JobId, u64)>,
1020 ) {
1021 {
1022 if let Some(prev_epoch) = self.completing_barrier.take() {
1023 assert_eq!(command_prev_epoch, Some(prev_epoch));
1024 self.committed_epoch = Some(prev_epoch);
1025 partial_graph_manager.ack_completed(self.partial_graph_id, prev_epoch);
1026 } else {
1027 assert_eq!(command_prev_epoch, None);
1028 };
1029 for (job_id, epoch) in independent_job_epochs {
1030 if let Some(job) = self.independent_checkpoint_job_controls.get_mut(&job_id) {
1031 job.ack_completed(partial_graph_manager, epoch);
1032 }
1033 }
1036 }
1037 }
1038
1039 fn handle_refresh_table_info(
1040 &self,
1041 task: &mut Option<CompleteBarrierTask>,
1042 resps: &HashMap<WorkerId, BarrierCompleteResponse>,
1043 ) {
1044 let list_finished_info = resps
1045 .values()
1046 .flat_map(|resp| resp.list_finished_sources.clone())
1047 .collect::<Vec<_>>();
1048 if !list_finished_info.is_empty() {
1049 let task = task.get_or_insert_default();
1050 task.list_finished_source_ids.extend(list_finished_info);
1051 }
1052
1053 let load_finished_info = resps
1054 .values()
1055 .flat_map(|resp| resp.load_finished_sources.clone())
1056 .collect::<Vec<_>>();
1057 if !load_finished_info.is_empty() {
1058 let task = task.get_or_insert_default();
1059 task.load_finished_source_ids.extend(load_finished_info);
1060 }
1061
1062 let refresh_finished_table_ids: Vec<JobId> = resps
1063 .values()
1064 .flat_map(|resp| {
1065 resp.refresh_finished_tables
1066 .iter()
1067 .map(|table_id| table_id.as_job_id())
1068 })
1069 .collect::<Vec<_>>();
1070 if !refresh_finished_table_ids.is_empty() {
1071 let task = task.get_or_insert_default();
1072 task.refresh_finished_table_job_ids
1073 .extend(refresh_finished_table_ids);
1074 }
1075 }
1076}
1077
1078impl DatabaseCheckpointControl {
1079 fn handle_new_barrier(
1081 &mut self,
1082 command: Option<(Command, Vec<Notifier>)>,
1083 checkpoint: bool,
1084 span: tracing::Span,
1085 partial_graph_manager: &mut PartialGraphManager,
1086 hummock_version_stats: &HummockVersionStats,
1087 worker_nodes: &HashMap<WorkerId, WorkerNode>,
1088 ) -> MetaResult<()> {
1089 let curr_epoch = self.state.in_flight_prev_epoch().next();
1090
1091 let (command, mut notifiers) = if let Some((command, notifiers)) = command {
1092 (Some(command), notifiers)
1093 } else {
1094 (None, vec![])
1095 };
1096
1097 debug_assert!(
1098 !matches!(
1099 &command,
1100 Some(Command::RescheduleIntent {
1101 reschedule_plan: None,
1102 ..
1103 })
1104 ),
1105 "reschedule intent should be resolved before injection"
1106 );
1107
1108 if let Some(Command::DropStreamingJobs {
1109 streaming_job_ids, ..
1110 }) = &command
1111 {
1112 if streaming_job_ids.len() > 1 {
1113 for job_to_cancel in streaming_job_ids {
1114 if self
1115 .independent_checkpoint_job_controls
1116 .contains_key(job_to_cancel)
1117 {
1118 warn!(
1119 job_id = %job_to_cancel,
1120 "ignore multi-job cancel command on creating snapshot backfill streaming job"
1121 );
1122 for notifier in notifiers {
1123 notifier
1124 .notify_start_failed(anyhow!("cannot cancel creating snapshot backfill streaming job with other jobs, \
1125 the job will continue creating until created or recovery. Please cancel the snapshot backfilling job in a single DDL ").into());
1126 }
1127 return Ok(());
1128 }
1129 }
1130 } else if let Some(job_to_drop) = streaming_job_ids.iter().next()
1131 && let Some(job) = self
1132 .independent_checkpoint_job_controls
1133 .get_mut(job_to_drop)
1134 {
1135 let dropped = job.drop(&mut notifiers, partial_graph_manager);
1136 if dropped {
1137 return Ok(());
1138 }
1139 }
1140 }
1141
1142 if let Some(Command::Throttle { jobs, .. }) = &command
1143 && jobs.len() > 1
1144 && let Some(independent_job_id) = jobs
1145 .iter()
1146 .find(|job| self.independent_checkpoint_job_controls.contains_key(*job))
1147 {
1148 warn!(
1149 job_id = %independent_job_id,
1150 "ignore multi-job throttle command on independent checkpoint job"
1151 );
1152 for notifier in notifiers {
1153 notifier.notify_start_failed(
1154 anyhow!(
1155 "cannot alter rate limit for independent checkpoint job with other jobs, \
1156 the original rate limit will be kept during recovery."
1157 )
1158 .into(),
1159 );
1160 }
1161 return Ok(());
1162 };
1163
1164 if let Some(Command::RescheduleIntent {
1165 reschedule_plan: Some(reschedule_plan),
1166 ..
1167 }) = &command
1168 && !self.independent_checkpoint_job_controls.is_empty()
1169 {
1170 let blocked_job_ids =
1171 self.collect_reschedule_blocked_jobs_for_independent_jobs_inflight()?;
1172 let blocked_reschedule_job_ids = self.collect_reschedule_blocked_job_ids(
1173 &reschedule_plan.reschedules,
1174 &reschedule_plan.fragment_actors,
1175 &blocked_job_ids,
1176 );
1177 if !blocked_reschedule_job_ids.is_empty() {
1178 warn!(
1179 blocked_reschedule_job_ids = ?blocked_reschedule_job_ids,
1180 "reject reschedule fragments related to creating unreschedulable backfill jobs"
1181 );
1182 for notifier in notifiers {
1183 notifier.notify_start_failed(
1184 anyhow!(
1185 "cannot reschedule jobs {:?} when creating jobs with unreschedulable backfill fragments",
1186 blocked_reschedule_job_ids
1187 )
1188 .into(),
1189 );
1190 }
1191 return Ok(());
1192 }
1193 }
1194
1195 if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1196 && self.database_info.is_empty()
1197 {
1198 assert!(
1199 self.independent_checkpoint_job_controls.is_empty(),
1200 "should not have snapshot backfill job when there is no normal job in database"
1201 );
1202 for mut notifier in notifiers {
1204 notifier.notify_started();
1205 notifier.notify_collected();
1206 }
1207 return Ok(());
1208 };
1209
1210 if let Some(Command::CreateStreamingJob {
1211 job_type: CreateStreamingJobType::SnapshotBackfill(_),
1212 ..
1213 }) = &command
1214 && self.state.is_paused()
1215 {
1216 warn!("cannot create streaming job with snapshot backfill when paused");
1217 for notifier in notifiers {
1218 notifier.notify_start_failed(
1219 anyhow!("cannot create streaming job with snapshot backfill when paused",)
1220 .into(),
1221 );
1222 }
1223 return Ok(());
1224 }
1225
1226 let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1227 barrier_info.prev_epoch.span().in_scope(|| {
1229 tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch(), "new barrier enqueued");
1230 });
1231 span.record("epoch", barrier_info.curr_epoch());
1232
1233 let epoch = barrier_info.epoch();
1234 let ApplyCommandInfo { jobs_to_wait } = match self.apply_command(
1235 command,
1236 &mut notifiers,
1237 barrier_info,
1238 partial_graph_manager,
1239 hummock_version_stats,
1240 worker_nodes,
1241 ) {
1242 Ok(info) => {
1243 assert!(notifiers.is_empty());
1244 info
1245 }
1246 Err(err) => {
1247 for notifier in notifiers {
1248 notifier.notify_start_failed(err.clone());
1249 }
1250 fail_point!("inject_barrier_err_success");
1251 return Err(err);
1252 }
1253 };
1254
1255 self.enqueue_command(epoch, jobs_to_wait);
1257
1258 Ok(())
1259 }
1260}