1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::task::Poll;
19
20use anyhow::anyhow;
21use fail::fail_point;
22use risingwave_common::catalog::{DatabaseId, TableId};
23use risingwave_common::id::JobId;
24use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
25use risingwave_common::system_param::AdaptiveParallelismStrategy;
26use risingwave_common::util::epoch::EpochPair;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_meta_model::WorkerId;
29use risingwave_pb::common::WorkerNode;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::id::{FragmentId, PartialGraphId};
32use risingwave_pb::stream_plan::DispatcherType as PbDispatcherType;
33use risingwave_pb::stream_plan::stream_node::NodeBody;
34use risingwave_pb::stream_service::BarrierCompleteResponse;
35use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
36use tracing::{debug, warn};
37
38use crate::barrier::cdc_progress::CdcProgress;
39use crate::barrier::checkpoint::creating_job::CreatingStreamingJobControl;
40use crate::barrier::checkpoint::recovery::{
41 DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
42 RecoveringStateAction,
43};
44use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
45use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
46use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
47use crate::barrier::notifier::Notifier;
48use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager, PartialGraphStat};
49use crate::barrier::progress::TrackingJob;
50use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
51use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
52use crate::barrier::utils::collect_creating_job_commit_epoch_info;
53use crate::barrier::{
54 BackfillProgress, Command, CreateStreamingJobType, FragmentBackfillProgress, Reschedule,
55};
56use crate::controller::scale::{build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs};
57use crate::manager::MetaSrvEnv;
58use crate::model::ActorId;
59use crate::rpc::metrics::GLOBAL_META_METRICS;
60use crate::{MetaError, MetaResult};
61
62pub(crate) struct CheckpointControl {
63 pub(crate) env: MetaSrvEnv,
64 pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
65 pub(super) hummock_version_stats: HummockVersionStats,
66 pub(crate) in_flight_barrier_nums: usize,
68}
69
70impl CheckpointControl {
71 pub fn new(env: MetaSrvEnv) -> Self {
72 Self {
73 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
74 env,
75 databases: Default::default(),
76 hummock_version_stats: Default::default(),
77 }
78 }
79
80 pub(crate) fn recover(
81 databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
82 failed_databases: HashMap<DatabaseId, HashSet<PartialGraphId>>, hummock_version_stats: HummockVersionStats,
84 env: MetaSrvEnv,
85 ) -> Self {
86 env.shared_actor_infos()
87 .retain_databases(databases.keys().chain(failed_databases.keys()).cloned());
88 Self {
89 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
90 env,
91 databases: databases
92 .into_iter()
93 .map(|(database_id, control)| {
94 (
95 database_id,
96 DatabaseCheckpointControlStatus::Running(control),
97 )
98 })
99 .chain(failed_databases.into_iter().map(
100 |(database_id, resetting_partial_graphs)| {
101 (
102 database_id,
103 DatabaseCheckpointControlStatus::Recovering(
104 DatabaseRecoveringState::new_resetting(
105 database_id,
106 resetting_partial_graphs,
107 ),
108 ),
109 )
110 },
111 ))
112 .collect(),
113 hummock_version_stats,
114 }
115 }
116
117 pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
118 self.hummock_version_stats = output.hummock_version_stats;
119 for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
120 self.databases
121 .get_mut(&database_id)
122 .expect("should exist")
123 .expect_running("should have wait for completing command before enter recovery")
124 .ack_completed(command_prev_epoch, creating_job_epochs);
125 }
126 }
127
128 pub(crate) fn next_complete_barrier_task(
129 &mut self,
130 periodic_barriers: &mut PeriodicBarriers,
131 partial_graph_manager: &mut PartialGraphManager,
132 ) -> Option<CompleteBarrierTask> {
133 let mut task = None;
134 for database in self.databases.values_mut() {
135 let Some(database) = database.running_state_mut() else {
136 continue;
137 };
138 database.next_complete_barrier_task(
139 periodic_barriers,
140 partial_graph_manager,
141 &mut task,
142 &self.hummock_version_stats,
143 );
144 }
145 task
146 }
147
148 pub(crate) fn barrier_collected(
149 &mut self,
150 partial_graph_id: PartialGraphId,
151 collected_barrier: CollectedBarrier<'_>,
152 periodic_barriers: &mut PeriodicBarriers,
153 ) -> MetaResult<()> {
154 let (database_id, _) = from_partial_graph_id(partial_graph_id);
155 let database_status = self.databases.get_mut(&database_id).expect("should exist");
156 match database_status {
157 DatabaseCheckpointControlStatus::Running(database) => {
158 database.barrier_collected(partial_graph_id, collected_barrier, periodic_barriers)
159 }
160 DatabaseCheckpointControlStatus::Recovering(_) => {
161 if cfg!(debug_assertions) {
162 panic!(
163 "receive collected barrier {:?} on recovering database {} from partial graph {}",
164 collected_barrier, database_id, partial_graph_id
165 );
166 } else {
167 warn!(?collected_barrier, %partial_graph_id, "ignore collected barrier on recovering database");
168 }
169 Ok(())
170 }
171 }
172 }
173
174 pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
175 self.databases.iter().filter_map(|(database_id, database)| {
176 database.running_state().is_none().then_some(*database_id)
177 })
178 }
179
180 pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
181 self.databases.iter().filter_map(|(database_id, database)| {
182 database.running_state().is_some().then_some(*database_id)
183 })
184 }
185
186 pub(crate) fn database_info(&self, database_id: DatabaseId) -> Option<&InflightDatabaseInfo> {
187 self.databases
188 .get(&database_id)
189 .and_then(|database| database.running_state())
190 .map(|database| &database.database_info)
191 }
192
193 pub(crate) fn may_have_snapshot_backfilling_jobs(&self) -> bool {
194 self.databases
195 .values()
196 .any(|database| database.may_have_snapshot_backfilling_jobs())
197 }
198
199 pub(crate) fn handle_new_barrier(
201 &mut self,
202 new_barrier: NewBarrier,
203 partial_graph_manager: &mut PartialGraphManager,
204 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
205 worker_nodes: &HashMap<WorkerId, WorkerNode>,
206 ) -> MetaResult<()> {
207 let NewBarrier {
208 database_id,
209 command,
210 span,
211 checkpoint,
212 } = new_barrier;
213
214 if let Some((mut command, notifiers)) = command {
215 if let &mut Command::CreateStreamingJob {
216 ref mut cross_db_snapshot_backfill_info,
217 ref info,
218 ..
219 } = &mut command
220 {
221 for (table_id, snapshot_epoch) in
222 &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
223 {
224 for database in self.databases.values() {
225 if let Some(database) = database.running_state()
226 && database.database_info.contains_job(table_id.as_job_id())
227 {
228 if let Some(committed_epoch) = database.committed_epoch {
229 *snapshot_epoch = Some(committed_epoch);
230 }
231 break;
232 }
233 }
234 if snapshot_epoch.is_none() {
235 let table_id = *table_id;
236 warn!(
237 ?cross_db_snapshot_backfill_info,
238 ?table_id,
239 ?info,
240 "database of cross db upstream table not found"
241 );
242 let err: MetaError =
243 anyhow!("database of cross db upstream table {} not found", table_id)
244 .into();
245 for notifier in notifiers {
246 notifier.notify_start_failed(err.clone());
247 }
248
249 return Ok(());
250 }
251 }
252 }
253
254 let database = match self.databases.entry(database_id) {
255 Entry::Occupied(entry) => entry
256 .into_mut()
257 .expect_running("should not have command when not running"),
258 Entry::Vacant(entry) => match &command {
259 Command::CreateStreamingJob { info, job_type, .. } => {
260 let CreateStreamingJobType::Normal = job_type else {
261 if cfg!(debug_assertions) {
262 panic!(
263 "unexpected first job of type {job_type:?} with info {info:?}"
264 );
265 } else {
266 for notifier in notifiers {
267 notifier.notify_start_failed(anyhow!("unexpected job_type {job_type:?} for first job {} in database {database_id}", info.streaming_job.id()).into());
268 }
269 return Ok(());
270 }
271 };
272 let new_database = DatabaseCheckpointControl::new(
273 database_id,
274 self.env.shared_actor_infos().clone(),
275 );
276 let adder = partial_graph_manager.add_partial_graph(
277 to_partial_graph_id(database_id, None),
278 DatabaseCheckpointControlMetrics::new(database_id),
279 );
280 adder.added();
281 entry
282 .insert(DatabaseCheckpointControlStatus::Running(new_database))
283 .expect_running("just initialized as running")
284 }
285 Command::Flush
286 | Command::Pause
287 | Command::Resume
288 | Command::DropStreamingJobs { .. }
289 | Command::DropSubscription { .. } => {
290 for mut notifier in notifiers {
291 notifier.notify_started();
292 notifier.notify_collected();
293 }
294 warn!(?command, "skip command for empty database");
295 return Ok(());
296 }
297 Command::RescheduleIntent { .. }
298 | Command::ReplaceStreamJob(_)
299 | Command::SourceChangeSplit(_)
300 | Command::Throttle { .. }
301 | Command::CreateSubscription { .. }
302 | Command::AlterSubscriptionRetention { .. }
303 | Command::ConnectorPropsChange(_)
304 | Command::Refresh { .. }
305 | Command::ListFinish { .. }
306 | Command::LoadFinish { .. }
307 | Command::ResetSource { .. }
308 | Command::ResumeBackfill { .. }
309 | Command::InjectSourceOffsets { .. } => {
310 if cfg!(debug_assertions) {
311 panic!(
312 "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
313 database_id, command
314 )
315 } else {
316 warn!(%database_id, ?command, "database not exist when handling command");
317 for notifier in notifiers {
318 notifier.notify_start_failed(anyhow!("database {database_id} not exist when handling command {command:?}").into());
319 }
320 return Ok(());
321 }
322 }
323 },
324 };
325
326 database.handle_new_barrier(
327 Some((command, notifiers)),
328 checkpoint,
329 span,
330 partial_graph_manager,
331 &self.hummock_version_stats,
332 adaptive_parallelism_strategy,
333 worker_nodes,
334 )
335 } else {
336 let database = match self.databases.entry(database_id) {
337 Entry::Occupied(entry) => entry.into_mut(),
338 Entry::Vacant(_) => {
339 return Ok(());
342 }
343 };
344 let Some(database) = database.running_state_mut() else {
345 return Ok(());
347 };
348 if !database.can_inject_barrier(self.in_flight_barrier_nums) {
349 return Ok(());
351 }
352 database.handle_new_barrier(
353 None,
354 checkpoint,
355 span,
356 partial_graph_manager,
357 &self.hummock_version_stats,
358 adaptive_parallelism_strategy,
359 worker_nodes,
360 )
361 }
362 }
363
364 pub(crate) fn gen_backfill_progress(&self) -> HashMap<JobId, BackfillProgress> {
365 let mut progress = HashMap::new();
366 for status in self.databases.values() {
367 let Some(database_checkpoint_control) = status.running_state() else {
368 continue;
369 };
370 progress.extend(
372 database_checkpoint_control
373 .database_info
374 .gen_backfill_progress(),
375 );
376 for (job_id, creating_job) in
378 &database_checkpoint_control.creating_streaming_job_controls
379 {
380 progress.extend([(*job_id, creating_job.gen_backfill_progress())]);
381 }
382 }
383 progress
384 }
385
386 pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
387 let mut progress = Vec::new();
388 for status in self.databases.values() {
389 let Some(database_checkpoint_control) = status.running_state() else {
390 continue;
391 };
392 progress.extend(
393 database_checkpoint_control
394 .database_info
395 .gen_fragment_backfill_progress(),
396 );
397 for creating_job in database_checkpoint_control
398 .creating_streaming_job_controls
399 .values()
400 {
401 progress.extend(creating_job.gen_fragment_backfill_progress());
402 }
403 }
404 progress
405 }
406
407 pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
408 let mut progress = HashMap::new();
409 for status in self.databases.values() {
410 let Some(database_checkpoint_control) = status.running_state() else {
411 continue;
412 };
413 progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
415 }
416 progress
417 }
418
419 pub(crate) fn databases_failed_at_worker_err(
420 &mut self,
421 worker_id: WorkerId,
422 ) -> impl Iterator<Item = DatabaseId> + '_ {
423 self.databases
424 .iter_mut()
425 .filter_map(
426 move |(database_id, database_status)| match database_status {
427 DatabaseCheckpointControlStatus::Running(control) => {
428 if !control.is_valid_after_worker_err(worker_id) {
429 Some(*database_id)
430 } else {
431 None
432 }
433 }
434 DatabaseCheckpointControlStatus::Recovering(state) => {
435 if !state.is_valid_after_worker_err(worker_id) {
436 Some(*database_id)
437 } else {
438 None
439 }
440 }
441 },
442 )
443 }
444}
445
446pub(crate) enum CheckpointControlEvent<'a> {
447 EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
448 EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
449}
450
451impl CheckpointControl {
452 pub(crate) fn on_partial_graph_reset(
453 &mut self,
454 partial_graph_id: PartialGraphId,
455 reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
456 ) {
457 let (database_id, creating_job) = from_partial_graph_id(partial_graph_id);
458 match self.databases.get_mut(&database_id).expect("should exist") {
459 DatabaseCheckpointControlStatus::Running(database) => {
460 if let Some(creating_job_id) = creating_job {
461 let Some(job) = database
462 .creating_streaming_job_controls
463 .remove(&creating_job_id)
464 else {
465 if cfg!(debug_assertions) {
466 panic!(
467 "receive reset partial graph resp on non-existing creating job {creating_job_id} in database {database_id}"
468 )
469 }
470 warn!(
471 %database_id,
472 %creating_job_id,
473 "ignore reset partial graph resp on non-existing creating job on running database"
474 );
475 return;
476 };
477 job.on_partial_graph_reset();
478 } else {
479 unreachable!("should not receive reset database resp when database running")
480 }
481 }
482 DatabaseCheckpointControlStatus::Recovering(state) => {
483 state.on_partial_graph_reset(partial_graph_id, reset_resps);
484 }
485 }
486 }
487
488 pub(crate) fn on_partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
489 let (database_id, _) = from_partial_graph_id(partial_graph_id);
490 match self.databases.get_mut(&database_id).expect("should exist") {
491 DatabaseCheckpointControlStatus::Running(_) => {
492 unreachable!("should not have partial graph initialized when running")
493 }
494 DatabaseCheckpointControlStatus::Recovering(state) => {
495 state.partial_graph_initialized(partial_graph_id);
496 }
497 }
498 }
499
500 pub(crate) fn next_event(
501 &mut self,
502 ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
503 let mut this = Some(self);
504 poll_fn(move |cx| {
505 let Some(this_mut) = this.as_mut() else {
506 unreachable!("should not be polled after poll ready")
507 };
508 for (&database_id, database_status) in &mut this_mut.databases {
509 match database_status {
510 DatabaseCheckpointControlStatus::Running(_) => {}
511 DatabaseCheckpointControlStatus::Recovering(state) => {
512 let poll_result = state.poll_next_event(cx);
513 if let Poll::Ready(action) = poll_result {
514 let this = this.take().expect("checked Some");
515 return Poll::Ready(match action {
516 RecoveringStateAction::EnterInitializing(reset_workers) => {
517 CheckpointControlEvent::EnteringInitializing(
518 this.new_database_status_action(
519 database_id,
520 EnterInitializing(reset_workers),
521 ),
522 )
523 }
524 RecoveringStateAction::EnterRunning => {
525 CheckpointControlEvent::EnteringRunning(
526 this.new_database_status_action(database_id, EnterRunning),
527 )
528 }
529 });
530 }
531 }
532 }
533 }
534 Poll::Pending
535 })
536 }
537}
538
539pub(crate) enum DatabaseCheckpointControlStatus {
540 Running(DatabaseCheckpointControl),
541 Recovering(DatabaseRecoveringState),
542}
543
544impl DatabaseCheckpointControlStatus {
545 fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
546 match self {
547 DatabaseCheckpointControlStatus::Running(state) => Some(state),
548 DatabaseCheckpointControlStatus::Recovering(_) => None,
549 }
550 }
551
552 fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
553 match self {
554 DatabaseCheckpointControlStatus::Running(state) => Some(state),
555 DatabaseCheckpointControlStatus::Recovering(_) => None,
556 }
557 }
558
559 fn may_have_snapshot_backfilling_jobs(&self) -> bool {
560 self.running_state()
561 .map(|database| !database.creating_streaming_job_controls.is_empty())
562 .unwrap_or(true) }
564
565 fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
566 match self {
567 DatabaseCheckpointControlStatus::Running(state) => state,
568 DatabaseCheckpointControlStatus::Recovering(_) => {
569 panic!("should be at running: {}", reason)
570 }
571 }
572 }
573}
574
575pub(in crate::barrier) struct DatabaseCheckpointControlMetrics {
576 barrier_latency: LabelGuardedHistogram,
577 in_flight_barrier_nums: LabelGuardedIntGauge,
578 all_barrier_nums: LabelGuardedIntGauge,
579}
580
581impl DatabaseCheckpointControlMetrics {
582 pub(in crate::barrier) fn new(database_id: DatabaseId) -> Self {
583 let database_id_str = database_id.to_string();
584 let barrier_latency = GLOBAL_META_METRICS
585 .barrier_latency
586 .with_guarded_label_values(&[&database_id_str]);
587 let in_flight_barrier_nums = GLOBAL_META_METRICS
588 .in_flight_barrier_nums
589 .with_guarded_label_values(&[&database_id_str]);
590 let all_barrier_nums = GLOBAL_META_METRICS
591 .all_barrier_nums
592 .with_guarded_label_values(&[&database_id_str]);
593 Self {
594 barrier_latency,
595 in_flight_barrier_nums,
596 all_barrier_nums,
597 }
598 }
599}
600
601impl PartialGraphStat for DatabaseCheckpointControlMetrics {
602 fn observe_barrier_latency(&self, _epoch: EpochPair, barrier_latency_secs: f64) {
603 self.barrier_latency.observe(barrier_latency_secs);
604 }
605
606 fn observe_barrier_num(&self, inflight_barrier_num: usize, collected_barrier_num: usize) {
607 self.in_flight_barrier_nums.set(inflight_barrier_num as _);
608 self.all_barrier_nums
609 .set((inflight_barrier_num + collected_barrier_num) as _);
610 }
611}
612
613pub(in crate::barrier) struct DatabaseCheckpointControl {
615 pub(super) database_id: DatabaseId,
616 partial_graph_id: PartialGraphId,
617 pub(super) state: BarrierWorkerState,
618
619 command_ctx_queue: BTreeMap<u64, BarrierEpochState>,
622 completing_barrier: Option<u64>,
625
626 committed_epoch: Option<u64>,
627
628 pub(super) database_info: InflightDatabaseInfo,
629 pub creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
630}
631
632impl DatabaseCheckpointControl {
633 fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
634 Self {
635 database_id,
636 partial_graph_id: to_partial_graph_id(database_id, None),
637 state: BarrierWorkerState::new(),
638 command_ctx_queue: Default::default(),
639 completing_barrier: None,
640 committed_epoch: None,
641 database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
642 creating_streaming_job_controls: Default::default(),
643 }
644 }
645
646 pub(crate) fn recovery(
647 database_id: DatabaseId,
648 state: BarrierWorkerState,
649 committed_epoch: u64,
650 database_info: InflightDatabaseInfo,
651 creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
652 ) -> Self {
653 Self {
654 database_id,
655 partial_graph_id: to_partial_graph_id(database_id, None),
656 state,
657 command_ctx_queue: Default::default(),
658 completing_barrier: None,
659 committed_epoch: Some(committed_epoch),
660 database_info,
661 creating_streaming_job_controls,
662 }
663 }
664
665 pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
666 !self.database_info.contains_worker(worker_id as _)
667 && self
668 .creating_streaming_job_controls
669 .values()
670 .all(|job| job.is_valid_after_worker_err(worker_id))
671 }
672
673 fn enqueue_command(&mut self, prev_epoch: u64, creating_jobs_to_wait: HashSet<JobId>) {
675 tracing::trace!(prev_epoch, ?creating_jobs_to_wait, "enqueue command");
676 self.command_ctx_queue.insert(
677 prev_epoch,
678 BarrierEpochState {
679 is_collected: false,
680 creating_jobs_to_wait,
681 finished_jobs: HashMap::new(),
682 },
683 );
684 }
685
686 fn barrier_collected(
689 &mut self,
690 partial_graph_id: PartialGraphId,
691 collected_barrier: CollectedBarrier<'_>,
692 periodic_barriers: &mut PeriodicBarriers,
693 ) -> MetaResult<()> {
694 let prev_epoch = collected_barrier.epoch.prev;
695 tracing::trace!(
696 prev_epoch,
697 partial_graph_id = %partial_graph_id,
698 "barrier collected"
699 );
700 let (database_id, creating_job_id) = from_partial_graph_id(partial_graph_id);
701 assert_eq!(self.database_id, database_id);
702 match creating_job_id {
703 None => {
704 if let Some(state) = self.command_ctx_queue.get_mut(&prev_epoch) {
705 assert!(!state.is_collected);
706 state.is_collected = true;
707 } else {
708 panic!(
709 "collect barrier on non-existing barrier: {}, {:?}",
710 prev_epoch, collected_barrier
711 );
712 }
713 }
714 Some(creating_job_id) => {
715 let should_merge_to_upstream = self
716 .creating_streaming_job_controls
717 .get_mut(&creating_job_id)
718 .expect("should exist")
719 .collect(collected_barrier);
720 if should_merge_to_upstream {
721 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
722 }
723 }
724 }
725 Ok(())
726 }
727
728 fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
730 self.command_ctx_queue
731 .values()
732 .filter(|state| state.is_inflight())
733 .count()
734 < in_flight_barrier_nums
735 }
736}
737
738impl DatabaseCheckpointControl {
739 fn collect_backfill_pinned_upstream_log_epoch(
741 &self,
742 ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
743 self.creating_streaming_job_controls
744 .iter()
745 .map(|(job_id, creating_job)| (*job_id, creating_job.pinned_upstream_log_epoch()))
746 .collect()
747 }
748
749 fn collect_no_shuffle_fragment_relations_for_reschedule_check(
750 &self,
751 ) -> Vec<(FragmentId, FragmentId)> {
752 let mut no_shuffle_relations = Vec::new();
753 for fragment in self.database_info.fragment_infos() {
754 let downstream_fragment_id = fragment.fragment_id;
755 visit_stream_node_cont(&fragment.nodes, |node| {
756 if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
757 && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
758 {
759 no_shuffle_relations.push((merge.upstream_fragment_id, downstream_fragment_id));
760 }
761 true
762 });
763 }
764
765 for creating_job in self.creating_streaming_job_controls.values() {
766 creating_job.collect_no_shuffle_fragment_relations(&mut no_shuffle_relations);
767 }
768 no_shuffle_relations
769 }
770
771 fn collect_reschedule_blocked_jobs_for_creating_jobs_inflight(
772 &self,
773 ) -> MetaResult<HashSet<JobId>> {
774 let mut initial_blocked_fragment_ids = HashSet::new();
775 for creating_job in self.creating_streaming_job_controls.values() {
776 creating_job.collect_reschedule_blocked_fragment_ids(&mut initial_blocked_fragment_ids);
777 }
778
779 let mut blocked_fragment_ids = initial_blocked_fragment_ids.clone();
780 if !initial_blocked_fragment_ids.is_empty() {
781 let no_shuffle_relations =
782 self.collect_no_shuffle_fragment_relations_for_reschedule_check();
783 let (forward_edges, backward_edges) =
784 build_no_shuffle_fragment_graph_edges(no_shuffle_relations);
785 let initial_blocked_fragment_ids: Vec<_> =
786 initial_blocked_fragment_ids.iter().copied().collect();
787 for ensemble in find_no_shuffle_graphs(
788 &initial_blocked_fragment_ids,
789 &forward_edges,
790 &backward_edges,
791 )? {
792 blocked_fragment_ids.extend(ensemble.fragments());
793 }
794 }
795
796 let mut blocked_job_ids = HashSet::new();
797 blocked_job_ids.extend(
798 blocked_fragment_ids
799 .into_iter()
800 .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id)),
801 );
802 Ok(blocked_job_ids)
803 }
804
805 fn collect_reschedule_blocked_job_ids(
806 &self,
807 reschedules: &HashMap<FragmentId, Reschedule>,
808 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
809 blocked_job_ids: &HashSet<JobId>,
810 ) -> HashSet<JobId> {
811 let mut affected_fragment_ids: HashSet<FragmentId> = reschedules.keys().copied().collect();
812 affected_fragment_ids.extend(fragment_actors.keys().copied());
813 for reschedule in reschedules.values() {
814 affected_fragment_ids.extend(reschedule.downstream_fragment_ids.iter().copied());
815 affected_fragment_ids.extend(
816 reschedule
817 .upstream_fragment_dispatcher_ids
818 .iter()
819 .map(|(fragment_id, _)| *fragment_id),
820 );
821 }
822
823 affected_fragment_ids
824 .into_iter()
825 .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id))
826 .filter(|job_id| blocked_job_ids.contains(job_id))
827 .collect()
828 }
829
830 fn next_complete_barrier_task(
831 &mut self,
832 periodic_barriers: &mut PeriodicBarriers,
833 partial_graph_manager: &mut PartialGraphManager,
834 task: &mut Option<CompleteBarrierTask>,
835 hummock_version_stats: &HummockVersionStats,
836 ) {
837 let mut creating_jobs_task = vec![];
839 if let Some(committed_epoch) = self.committed_epoch {
840 let mut finished_jobs = Vec::new();
842 let min_upstream_inflight_barrier = self
843 .command_ctx_queue
844 .first_key_value()
845 .map(|(epoch, _)| *epoch);
846 for (job_id, job) in &mut self.creating_streaming_job_controls {
847 if let Some((epoch, resps, info, is_finish_epoch)) = job.start_completing(
848 partial_graph_manager,
849 min_upstream_inflight_barrier,
850 committed_epoch,
851 ) {
852 if is_finish_epoch {
853 assert!(info.notifiers.is_empty());
854 finished_jobs.push((*job_id, epoch, resps));
855 continue;
856 };
857 creating_jobs_task.push((*job_id, epoch, resps, info));
858 }
859 }
860
861 if !finished_jobs.is_empty() {
862 partial_graph_manager.remove_partial_graphs(
863 finished_jobs
864 .iter()
865 .map(|(job_id, ..)| to_partial_graph_id(self.database_id, Some(*job_id)))
866 .collect(),
867 );
868 }
869 for (job_id, epoch, resps) in finished_jobs {
870 let epoch_state = &mut self
871 .command_ctx_queue
872 .get_mut(&epoch)
873 .expect("should exist");
874 assert!(epoch_state.creating_jobs_to_wait.remove(&job_id));
875 debug!(epoch, %job_id, "finish creating job");
876 let creating_streaming_job = self
879 .creating_streaming_job_controls
880 .remove(&job_id)
881 .expect("should exist");
882 let tracking_job = creating_streaming_job.into_tracking_job();
883
884 assert!(
885 epoch_state
886 .finished_jobs
887 .insert(job_id, (resps, tracking_job))
888 .is_none()
889 );
890 }
891 }
892 assert!(self.completing_barrier.is_none());
893 while let Some((_, state)) = self.command_ctx_queue.first_key_value()
894 && !state.is_inflight()
895 {
896 {
897 let (epoch, state) = self.command_ctx_queue.pop_first().expect("non-empty");
898 assert!(state.creating_jobs_to_wait.is_empty());
899 assert!(state.is_collected);
900
901 let (resps, mut info) =
902 partial_graph_manager.take_collected_barrier(self.partial_graph_id, epoch);
903
904 self.handle_refresh_table_info(task, &resps);
905 self.database_info.apply_collected_command(
906 &info.post_collect_command,
907 &resps,
908 hummock_version_stats,
909 );
910
911 let mut resps_to_commit = resps;
912 if !info.barrier_info.kind.is_checkpoint() {
913 info.notifiers.drain(..).for_each(|notifier| {
914 notifier.notify_collected();
915 });
916 if self.database_info.has_pending_finished_jobs()
917 && !partial_graph_manager
918 .has_pending_checkpoint_barrier(self.partial_graph_id)
919 {
920 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
921 }
922 continue;
923 }
924 let mut staging_commit_info = self.database_info.take_staging_commit_info();
925 state
926 .finished_jobs
927 .into_iter()
928 .for_each(|(_, (resps, tracking_job))| {
929 resps_to_commit.extend(resps);
930 staging_commit_info.finished_jobs.push(tracking_job);
931 });
932 let task = task.get_or_insert_default();
933 Command::collect_commit_epoch_info(
934 &self.database_info,
935 &info,
936 &mut task.commit_info,
937 resps_to_commit,
938 self.collect_backfill_pinned_upstream_log_epoch(),
939 );
940 self.completing_barrier = Some(info.barrier_info.prev_epoch());
941 task.finished_jobs.extend(staging_commit_info.finished_jobs);
942 task.finished_cdc_table_backfill
943 .extend(staging_commit_info.finished_cdc_table_backfill);
944 task.epoch_infos
945 .try_insert(self.partial_graph_id, info)
946 .expect("non duplicate");
947 task.commit_info
948 .truncate_tables
949 .extend(staging_commit_info.table_ids_to_truncate);
950 break;
951 }
952 }
953 if !creating_jobs_task.is_empty() {
954 let task = task.get_or_insert_default();
955 for (job_id, epoch, resps, info) in creating_jobs_task {
956 collect_creating_job_commit_epoch_info(&mut task.commit_info, epoch, resps, &info);
957 task.epoch_infos
958 .try_insert(to_partial_graph_id(self.database_id, Some(job_id)), info)
959 .expect("non duplicate");
960 }
961 }
962 }
963
964 fn ack_completed(
965 &mut self,
966 command_prev_epoch: Option<u64>,
967 creating_job_epochs: Vec<(JobId, u64)>,
968 ) {
969 {
970 if let Some(prev_epoch) = self.completing_barrier.take() {
971 assert_eq!(command_prev_epoch, Some(prev_epoch));
972 self.committed_epoch = Some(prev_epoch);
973 } else {
974 assert_eq!(command_prev_epoch, None);
975 };
976 for (job_id, epoch) in creating_job_epochs {
977 self.creating_streaming_job_controls
978 .get_mut(&job_id)
979 .expect("should exist")
980 .ack_completed(epoch)
981 }
982 }
983 }
984
985 fn handle_refresh_table_info(
986 &self,
987 task: &mut Option<CompleteBarrierTask>,
988 resps: &[BarrierCompleteResponse],
989 ) {
990 let list_finished_info = resps
991 .iter()
992 .flat_map(|resp| resp.list_finished_sources.clone())
993 .collect::<Vec<_>>();
994 if !list_finished_info.is_empty() {
995 let task = task.get_or_insert_default();
996 task.list_finished_source_ids.extend(list_finished_info);
997 }
998
999 let load_finished_info = resps
1000 .iter()
1001 .flat_map(|resp| resp.load_finished_sources.clone())
1002 .collect::<Vec<_>>();
1003 if !load_finished_info.is_empty() {
1004 let task = task.get_or_insert_default();
1005 task.load_finished_source_ids.extend(load_finished_info);
1006 }
1007
1008 let refresh_finished_table_ids: Vec<JobId> = resps
1009 .iter()
1010 .flat_map(|resp| {
1011 resp.refresh_finished_tables
1012 .iter()
1013 .map(|table_id| table_id.as_job_id())
1014 })
1015 .collect::<Vec<_>>();
1016 if !refresh_finished_table_ids.is_empty() {
1017 let task = task.get_or_insert_default();
1018 task.refresh_finished_table_job_ids
1019 .extend(refresh_finished_table_ids);
1020 }
1021 }
1022}
1023
1024#[derive(Debug)]
1025struct BarrierEpochState {
1027 is_collected: bool,
1028
1029 creating_jobs_to_wait: HashSet<JobId>,
1030
1031 finished_jobs: HashMap<JobId, (Vec<BarrierCompleteResponse>, TrackingJob)>,
1032}
1033
1034impl BarrierEpochState {
1035 fn is_inflight(&self) -> bool {
1036 !self.is_collected || !self.creating_jobs_to_wait.is_empty()
1037 }
1038}
1039
1040impl DatabaseCheckpointControl {
1041 fn handle_new_barrier(
1043 &mut self,
1044 command: Option<(Command, Vec<Notifier>)>,
1045 checkpoint: bool,
1046 span: tracing::Span,
1047 partial_graph_manager: &mut PartialGraphManager,
1048 hummock_version_stats: &HummockVersionStats,
1049 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
1050 worker_nodes: &HashMap<WorkerId, WorkerNode>,
1051 ) -> MetaResult<()> {
1052 let curr_epoch = self.state.in_flight_prev_epoch().next();
1053
1054 let (command, mut notifiers) = if let Some((command, notifiers)) = command {
1055 (Some(command), notifiers)
1056 } else {
1057 (None, vec![])
1058 };
1059
1060 debug_assert!(
1061 !matches!(
1062 &command,
1063 Some(Command::RescheduleIntent {
1064 reschedule_plan: None,
1065 ..
1066 })
1067 ),
1068 "reschedule intent should be resolved before injection"
1069 );
1070
1071 if let Some(Command::DropStreamingJobs {
1072 streaming_job_ids, ..
1073 }) = &command
1074 {
1075 if streaming_job_ids.len() > 1 {
1076 for job_to_cancel in streaming_job_ids {
1077 if self
1078 .creating_streaming_job_controls
1079 .contains_key(job_to_cancel)
1080 {
1081 warn!(
1082 job_id = %job_to_cancel,
1083 "ignore multi-job cancel command on creating snapshot backfill streaming job"
1084 );
1085 for notifier in notifiers {
1086 notifier
1087 .notify_start_failed(anyhow!("cannot cancel creating snapshot backfill streaming job with other jobs, \
1088 the job will continue creating until created or recovery. Please cancel the snapshot backfilling job in a single DDL ").into());
1089 }
1090 return Ok(());
1091 }
1092 }
1093 } else if let Some(job_to_drop) = streaming_job_ids.iter().next()
1094 && let Some(creating_job) =
1095 self.creating_streaming_job_controls.get_mut(job_to_drop)
1096 && creating_job.drop(&mut notifiers, partial_graph_manager)
1097 {
1098 return Ok(());
1099 }
1100 }
1101
1102 if let Some(Command::Throttle { jobs, .. }) = &command
1103 && jobs.len() > 1
1104 && let Some(creating_job_id) = jobs
1105 .iter()
1106 .find(|job| self.creating_streaming_job_controls.contains_key(*job))
1107 {
1108 warn!(
1109 job_id = %creating_job_id,
1110 "ignore multi-job throttle command on creating snapshot backfill streaming job"
1111 );
1112 for notifier in notifiers {
1113 notifier
1114 .notify_start_failed(anyhow!("cannot alter rate limit for snapshot backfill streaming job with other jobs, \
1115 the original rate limit will be kept recovery.").into());
1116 }
1117 return Ok(());
1118 };
1119
1120 if let Some(Command::RescheduleIntent {
1121 reschedule_plan: Some(reschedule_plan),
1122 ..
1123 }) = &command
1124 && !self.creating_streaming_job_controls.is_empty()
1125 {
1126 let blocked_job_ids =
1127 self.collect_reschedule_blocked_jobs_for_creating_jobs_inflight()?;
1128 let blocked_reschedule_job_ids = self.collect_reschedule_blocked_job_ids(
1129 &reschedule_plan.reschedules,
1130 &reschedule_plan.fragment_actors,
1131 &blocked_job_ids,
1132 );
1133 if !blocked_reschedule_job_ids.is_empty() {
1134 warn!(
1135 blocked_reschedule_job_ids = ?blocked_reschedule_job_ids,
1136 "reject reschedule fragments related to creating unreschedulable backfill jobs"
1137 );
1138 for notifier in notifiers {
1139 notifier.notify_start_failed(
1140 anyhow!(
1141 "cannot reschedule jobs {:?} when creating jobs with unreschedulable backfill fragments",
1142 blocked_reschedule_job_ids
1143 )
1144 .into(),
1145 );
1146 }
1147 return Ok(());
1148 }
1149 }
1150
1151 if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1152 && self.database_info.is_empty()
1153 {
1154 assert!(
1155 self.creating_streaming_job_controls.is_empty(),
1156 "should not have snapshot backfill job when there is no normal job in database"
1157 );
1158 for mut notifier in notifiers {
1160 notifier.notify_started();
1161 notifier.notify_collected();
1162 }
1163 return Ok(());
1164 };
1165
1166 if let Some(Command::CreateStreamingJob {
1167 job_type: CreateStreamingJobType::SnapshotBackfill(_),
1168 ..
1169 }) = &command
1170 && self.state.is_paused()
1171 {
1172 warn!("cannot create streaming job with snapshot backfill when paused");
1173 for notifier in notifiers {
1174 notifier.notify_start_failed(
1175 anyhow!("cannot create streaming job with snapshot backfill when paused",)
1176 .into(),
1177 );
1178 }
1179 return Ok(());
1180 }
1181
1182 let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1183 barrier_info.prev_epoch.span().in_scope(|| {
1185 tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch(), "new barrier enqueued");
1186 });
1187 span.record("epoch", barrier_info.curr_epoch());
1188
1189 let prev_epoch = barrier_info.prev_epoch();
1190 let ApplyCommandInfo { jobs_to_wait } = match self.apply_command(
1191 command,
1192 &mut notifiers,
1193 barrier_info,
1194 partial_graph_manager,
1195 hummock_version_stats,
1196 adaptive_parallelism_strategy,
1197 worker_nodes,
1198 ) {
1199 Ok(info) => {
1200 assert!(notifiers.is_empty());
1201 info
1202 }
1203 Err(err) => {
1204 for notifier in notifiers {
1205 notifier.notify_start_failed(err.clone());
1206 }
1207 fail_point!("inject_barrier_err_success");
1208 return Err(err);
1209 }
1210 };
1211
1212 self.enqueue_command(prev_epoch, jobs_to_wait);
1214
1215 Ok(())
1216 }
1217}