1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_common::system_param::AdaptiveParallelismStrategy;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
29use risingwave_meta_model::WorkerId;
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::id::{FragmentId, PartialGraphId};
33use risingwave_pb::stream_plan::DispatcherType as PbDispatcherType;
34use risingwave_pb::stream_plan::stream_node::NodeBody;
35use risingwave_pb::stream_service::BarrierCompleteResponse;
36use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
37use tracing::{debug, warn};
38
39use crate::barrier::cdc_progress::CdcProgress;
40use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
41use crate::barrier::checkpoint::recovery::{
42 DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
43 RecoveringStateAction,
44};
45use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
46use crate::barrier::command::CommandContext;
47use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
48use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
49use crate::barrier::notifier::Notifier;
50use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager};
51use crate::barrier::progress::TrackingJob;
52use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
53use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
54use crate::barrier::utils::collect_creating_job_commit_epoch_info;
55use crate::barrier::{
56 BackfillProgress, Command, CreateStreamingJobType, FragmentBackfillProgress, Reschedule,
57};
58use crate::controller::scale::{build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs};
59use crate::manager::MetaSrvEnv;
60use crate::model::ActorId;
61use crate::rpc::metrics::GLOBAL_META_METRICS;
62use crate::{MetaError, MetaResult};
63
64pub(crate) struct CheckpointControl {
65 pub(crate) env: MetaSrvEnv,
66 pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
67 pub(super) hummock_version_stats: HummockVersionStats,
68 pub(crate) in_flight_barrier_nums: usize,
70}
71
72impl CheckpointControl {
73 pub fn new(env: MetaSrvEnv) -> Self {
74 Self {
75 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
76 env,
77 databases: Default::default(),
78 hummock_version_stats: Default::default(),
79 }
80 }
81
82 pub(crate) fn recover(
83 databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
84 failed_databases: HashMap<DatabaseId, HashSet<PartialGraphId>>, hummock_version_stats: HummockVersionStats,
86 env: MetaSrvEnv,
87 ) -> Self {
88 env.shared_actor_infos()
89 .retain_databases(databases.keys().chain(failed_databases.keys()).cloned());
90 Self {
91 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
92 env,
93 databases: databases
94 .into_iter()
95 .map(|(database_id, control)| {
96 (
97 database_id,
98 DatabaseCheckpointControlStatus::Running(control),
99 )
100 })
101 .chain(failed_databases.into_iter().map(
102 |(database_id, resetting_partial_graphs)| {
103 (
104 database_id,
105 DatabaseCheckpointControlStatus::Recovering(
106 DatabaseRecoveringState::new_resetting(
107 database_id,
108 resetting_partial_graphs,
109 ),
110 ),
111 )
112 },
113 ))
114 .collect(),
115 hummock_version_stats,
116 }
117 }
118
119 pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
120 self.hummock_version_stats = output.hummock_version_stats;
121 for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
122 self.databases
123 .get_mut(&database_id)
124 .expect("should exist")
125 .expect_running("should have wait for completing command before enter recovery")
126 .ack_completed(command_prev_epoch, creating_job_epochs);
127 }
128 }
129
130 pub(crate) fn next_complete_barrier_task(
131 &mut self,
132 mut context: Option<(&mut PeriodicBarriers, &mut PartialGraphManager)>,
133 ) -> Option<CompleteBarrierTask> {
134 let mut task = None;
135 for database in self.databases.values_mut() {
136 let Some(database) = database.running_state_mut() else {
137 continue;
138 };
139 let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
140 database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
141 }
142 task
143 }
144
145 pub(crate) fn barrier_collected(
146 &mut self,
147 partial_graph_id: PartialGraphId,
148 collected_barrier: CollectedBarrier,
149 periodic_barriers: &mut PeriodicBarriers,
150 ) -> MetaResult<()> {
151 let (database_id, _) = from_partial_graph_id(partial_graph_id);
152 let database_status = self.databases.get_mut(&database_id).expect("should exist");
153 match database_status {
154 DatabaseCheckpointControlStatus::Running(database) => {
155 database.barrier_collected(partial_graph_id, collected_barrier, periodic_barriers)
156 }
157 DatabaseCheckpointControlStatus::Recovering(_) => {
158 if cfg!(debug_assertions) {
159 panic!(
160 "receive collected barrier {:?} on recovering database {} from partial graph {}",
161 collected_barrier, database_id, partial_graph_id
162 );
163 } else {
164 warn!(?collected_barrier, %partial_graph_id, "ignore collected barrier on recovering database");
165 }
166 Ok(())
167 }
168 }
169 }
170
171 pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
172 self.databases.iter().filter_map(|(database_id, database)| {
173 database.running_state().is_none().then_some(*database_id)
174 })
175 }
176
177 pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
178 self.databases.iter().filter_map(|(database_id, database)| {
179 database.running_state().is_some().then_some(*database_id)
180 })
181 }
182
183 pub(crate) fn database_info(&self, database_id: DatabaseId) -> Option<&InflightDatabaseInfo> {
184 self.databases
185 .get(&database_id)
186 .and_then(|database| database.running_state())
187 .map(|database| &database.database_info)
188 }
189
190 pub(crate) fn may_have_snapshot_backfilling_jobs(&self) -> bool {
191 self.databases
192 .values()
193 .any(|database| database.may_have_snapshot_backfilling_jobs())
194 }
195
196 pub(crate) fn handle_new_barrier(
198 &mut self,
199 new_barrier: NewBarrier,
200 partial_graph_manager: &mut PartialGraphManager,
201 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
202 worker_nodes: &HashMap<WorkerId, WorkerNode>,
203 ) -> MetaResult<()> {
204 let NewBarrier {
205 database_id,
206 command,
207 span,
208 checkpoint,
209 } = new_barrier;
210
211 if let Some((mut command, notifiers)) = command {
212 if let &mut Command::CreateStreamingJob {
213 ref mut cross_db_snapshot_backfill_info,
214 ref info,
215 ..
216 } = &mut command
217 {
218 for (table_id, snapshot_epoch) in
219 &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
220 {
221 for database in self.databases.values() {
222 if let Some(database) = database.running_state()
223 && database.database_info.contains_job(table_id.as_job_id())
224 {
225 if let Some(committed_epoch) = database.committed_epoch {
226 *snapshot_epoch = Some(committed_epoch);
227 }
228 break;
229 }
230 }
231 if snapshot_epoch.is_none() {
232 let table_id = *table_id;
233 warn!(
234 ?cross_db_snapshot_backfill_info,
235 ?table_id,
236 ?info,
237 "database of cross db upstream table not found"
238 );
239 let err: MetaError =
240 anyhow!("database of cross db upstream table {} not found", table_id)
241 .into();
242 for notifier in notifiers {
243 notifier.notify_start_failed(err.clone());
244 }
245
246 return Ok(());
247 }
248 }
249 }
250
251 let database = match self.databases.entry(database_id) {
252 Entry::Occupied(entry) => entry
253 .into_mut()
254 .expect_running("should not have command when not running"),
255 Entry::Vacant(entry) => match &command {
256 Command::CreateStreamingJob { info, job_type, .. } => {
257 let CreateStreamingJobType::Normal = job_type else {
258 if cfg!(debug_assertions) {
259 panic!(
260 "unexpected first job of type {job_type:?} with info {info:?}"
261 );
262 } else {
263 for notifier in notifiers {
264 notifier.notify_start_failed(anyhow!("unexpected job_type {job_type:?} for first job {} in database {database_id}", info.streaming_job.id()).into());
265 }
266 return Ok(());
267 }
268 };
269 let new_database = DatabaseCheckpointControl::new(
270 database_id,
271 self.env.shared_actor_infos().clone(),
272 );
273 let adder = partial_graph_manager
274 .add_partial_graph(to_partial_graph_id(database_id, None));
275 adder.added();
276 entry
277 .insert(DatabaseCheckpointControlStatus::Running(new_database))
278 .expect_running("just initialized as running")
279 }
280 Command::Flush
281 | Command::Pause
282 | Command::Resume
283 | Command::DropStreamingJobs { .. }
284 | Command::DropSubscription { .. } => {
285 for mut notifier in notifiers {
286 notifier.notify_started();
287 notifier.notify_collected();
288 }
289 warn!(?command, "skip command for empty database");
290 return Ok(());
291 }
292 Command::RescheduleIntent { .. }
293 | Command::ReplaceStreamJob(_)
294 | Command::SourceChangeSplit(_)
295 | Command::Throttle { .. }
296 | Command::CreateSubscription { .. }
297 | Command::AlterSubscriptionRetention { .. }
298 | Command::ConnectorPropsChange(_)
299 | Command::Refresh { .. }
300 | Command::ListFinish { .. }
301 | Command::LoadFinish { .. }
302 | Command::ResetSource { .. }
303 | Command::ResumeBackfill { .. }
304 | Command::InjectSourceOffsets { .. } => {
305 if cfg!(debug_assertions) {
306 panic!(
307 "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
308 database_id, command
309 )
310 } else {
311 warn!(%database_id, ?command, "database not exist when handling command");
312 for notifier in notifiers {
313 notifier.notify_start_failed(anyhow!("database {database_id} not exist when handling command {command:?}").into());
314 }
315 return Ok(());
316 }
317 }
318 },
319 };
320
321 database.handle_new_barrier(
322 Some((command, notifiers)),
323 checkpoint,
324 span,
325 partial_graph_manager,
326 &self.hummock_version_stats,
327 adaptive_parallelism_strategy,
328 worker_nodes,
329 )
330 } else {
331 let database = match self.databases.entry(database_id) {
332 Entry::Occupied(entry) => entry.into_mut(),
333 Entry::Vacant(_) => {
334 return Ok(());
337 }
338 };
339 let Some(database) = database.running_state_mut() else {
340 return Ok(());
342 };
343 if !database.can_inject_barrier(self.in_flight_barrier_nums) {
344 return Ok(());
346 }
347 database.handle_new_barrier(
348 None,
349 checkpoint,
350 span,
351 partial_graph_manager,
352 &self.hummock_version_stats,
353 adaptive_parallelism_strategy,
354 worker_nodes,
355 )
356 }
357 }
358
359 pub(crate) fn update_barrier_nums_metrics(&self) {
360 self.databases
361 .values()
362 .flat_map(|database| database.running_state())
363 .for_each(|database| database.update_barrier_nums_metrics());
364 }
365
366 pub(crate) fn gen_backfill_progress(&self) -> HashMap<JobId, BackfillProgress> {
367 let mut progress = HashMap::new();
368 for status in self.databases.values() {
369 let Some(database_checkpoint_control) = status.running_state() else {
370 continue;
371 };
372 progress.extend(
374 database_checkpoint_control
375 .database_info
376 .gen_backfill_progress(),
377 );
378 for (job_id, creating_job) in
380 &database_checkpoint_control.creating_streaming_job_controls
381 {
382 progress.extend([(*job_id, creating_job.gen_backfill_progress())]);
383 }
384 }
385 progress
386 }
387
388 pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
389 let mut progress = Vec::new();
390 for status in self.databases.values() {
391 let Some(database_checkpoint_control) = status.running_state() else {
392 continue;
393 };
394 progress.extend(
395 database_checkpoint_control
396 .database_info
397 .gen_fragment_backfill_progress(),
398 );
399 for creating_job in database_checkpoint_control
400 .creating_streaming_job_controls
401 .values()
402 {
403 progress.extend(creating_job.gen_fragment_backfill_progress());
404 }
405 }
406 progress
407 }
408
409 pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
410 let mut progress = HashMap::new();
411 for status in self.databases.values() {
412 let Some(database_checkpoint_control) = status.running_state() else {
413 continue;
414 };
415 progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
417 }
418 progress
419 }
420
421 pub(crate) fn databases_failed_at_worker_err(
422 &mut self,
423 worker_id: WorkerId,
424 ) -> impl Iterator<Item = DatabaseId> + '_ {
425 self.databases
426 .iter_mut()
427 .filter_map(
428 move |(database_id, database_status)| match database_status {
429 DatabaseCheckpointControlStatus::Running(control) => {
430 if !control.is_valid_after_worker_err(worker_id) {
431 Some(*database_id)
432 } else {
433 None
434 }
435 }
436 DatabaseCheckpointControlStatus::Recovering(state) => {
437 if !state.is_valid_after_worker_err(worker_id) {
438 Some(*database_id)
439 } else {
440 None
441 }
442 }
443 },
444 )
445 }
446
447 pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
448 for (_, node) in self.databases.values_mut().flat_map(|status| {
449 status
450 .running_state_mut()
451 .map(|database| take(&mut database.command_ctx_queue))
452 .into_iter()
453 .flatten()
454 }) {
455 for notifier in node.notifiers {
456 notifier.notify_collection_failed(err.clone());
457 }
458 node.enqueue_time.observe_duration();
459 }
460 }
461}
462
463pub(crate) enum CheckpointControlEvent<'a> {
464 EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
465 EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
466}
467
468impl CheckpointControl {
469 pub(crate) fn on_partial_graph_reset(
470 &mut self,
471 partial_graph_id: PartialGraphId,
472 reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
473 ) {
474 let (database_id, creating_job) = from_partial_graph_id(partial_graph_id);
475 match self.databases.get_mut(&database_id).expect("should exist") {
476 DatabaseCheckpointControlStatus::Running(database) => {
477 if let Some(creating_job_id) = creating_job {
478 let Some(job) = database
479 .creating_streaming_job_controls
480 .remove(&creating_job_id)
481 else {
482 if cfg!(debug_assertions) {
483 panic!(
484 "receive reset partial graph resp on non-existing creating job {creating_job_id} in database {database_id}"
485 )
486 }
487 warn!(
488 %database_id,
489 %creating_job_id,
490 "ignore reset partial graph resp on non-existing creating job on running database"
491 );
492 return;
493 };
494 job.on_partial_graph_reset();
495 } else {
496 unreachable!("should not receive reset database resp when database running")
497 }
498 }
499 DatabaseCheckpointControlStatus::Recovering(state) => {
500 state.on_partial_graph_reset(partial_graph_id, reset_resps);
501 }
502 }
503 }
504
505 pub(crate) fn on_partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
506 let (database_id, _) = from_partial_graph_id(partial_graph_id);
507 match self.databases.get_mut(&database_id).expect("should exist") {
508 DatabaseCheckpointControlStatus::Running(_) => {
509 unreachable!("should not have partial graph initialized when running")
510 }
511 DatabaseCheckpointControlStatus::Recovering(state) => {
512 state.partial_graph_initialized(partial_graph_id);
513 }
514 }
515 }
516
517 pub(crate) fn next_event(
518 &mut self,
519 ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
520 let mut this = Some(self);
521 poll_fn(move |cx| {
522 let Some(this_mut) = this.as_mut() else {
523 unreachable!("should not be polled after poll ready")
524 };
525 for (&database_id, database_status) in &mut this_mut.databases {
526 match database_status {
527 DatabaseCheckpointControlStatus::Running(_) => {}
528 DatabaseCheckpointControlStatus::Recovering(state) => {
529 let poll_result = state.poll_next_event(cx);
530 if let Poll::Ready(action) = poll_result {
531 let this = this.take().expect("checked Some");
532 return Poll::Ready(match action {
533 RecoveringStateAction::EnterInitializing(reset_workers) => {
534 CheckpointControlEvent::EnteringInitializing(
535 this.new_database_status_action(
536 database_id,
537 EnterInitializing(reset_workers),
538 ),
539 )
540 }
541 RecoveringStateAction::EnterRunning => {
542 CheckpointControlEvent::EnteringRunning(
543 this.new_database_status_action(database_id, EnterRunning),
544 )
545 }
546 });
547 }
548 }
549 }
550 }
551 Poll::Pending
552 })
553 }
554}
555
556pub(crate) enum DatabaseCheckpointControlStatus {
557 Running(DatabaseCheckpointControl),
558 Recovering(DatabaseRecoveringState),
559}
560
561impl DatabaseCheckpointControlStatus {
562 fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
563 match self {
564 DatabaseCheckpointControlStatus::Running(state) => Some(state),
565 DatabaseCheckpointControlStatus::Recovering(_) => None,
566 }
567 }
568
569 fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
570 match self {
571 DatabaseCheckpointControlStatus::Running(state) => Some(state),
572 DatabaseCheckpointControlStatus::Recovering(_) => None,
573 }
574 }
575
576 fn may_have_snapshot_backfilling_jobs(&self) -> bool {
577 self.running_state()
578 .map(|database| !database.creating_streaming_job_controls.is_empty())
579 .unwrap_or(true) }
581
582 fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
583 match self {
584 DatabaseCheckpointControlStatus::Running(state) => state,
585 DatabaseCheckpointControlStatus::Recovering(_) => {
586 panic!("should be at running: {}", reason)
587 }
588 }
589 }
590}
591
592struct DatabaseCheckpointControlMetrics {
593 barrier_latency: LabelGuardedHistogram,
594 in_flight_barrier_nums: LabelGuardedIntGauge,
595 all_barrier_nums: LabelGuardedIntGauge,
596}
597
598impl DatabaseCheckpointControlMetrics {
599 fn new(database_id: DatabaseId) -> Self {
600 let database_id_str = database_id.to_string();
601 let barrier_latency = GLOBAL_META_METRICS
602 .barrier_latency
603 .with_guarded_label_values(&[&database_id_str]);
604 let in_flight_barrier_nums = GLOBAL_META_METRICS
605 .in_flight_barrier_nums
606 .with_guarded_label_values(&[&database_id_str]);
607 let all_barrier_nums = GLOBAL_META_METRICS
608 .all_barrier_nums
609 .with_guarded_label_values(&[&database_id_str]);
610 Self {
611 barrier_latency,
612 in_flight_barrier_nums,
613 all_barrier_nums,
614 }
615 }
616}
617
618pub(in crate::barrier) struct DatabaseCheckpointControl {
620 pub(super) database_id: DatabaseId,
621 pub(super) state: BarrierWorkerState,
622
623 command_ctx_queue: BTreeMap<u64, EpochNode>,
626 completing_barrier: Option<u64>,
629
630 committed_epoch: Option<u64>,
631
632 pub(super) database_info: InflightDatabaseInfo,
633 pub creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
634
635 metrics: DatabaseCheckpointControlMetrics,
636}
637
638impl DatabaseCheckpointControl {
639 fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
640 Self {
641 database_id,
642 state: BarrierWorkerState::new(),
643 command_ctx_queue: Default::default(),
644 completing_barrier: None,
645 committed_epoch: None,
646 database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
647 creating_streaming_job_controls: Default::default(),
648 metrics: DatabaseCheckpointControlMetrics::new(database_id),
649 }
650 }
651
652 pub(crate) fn recovery(
653 database_id: DatabaseId,
654 state: BarrierWorkerState,
655 committed_epoch: u64,
656 database_info: InflightDatabaseInfo,
657 creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
658 ) -> Self {
659 Self {
660 database_id,
661 state,
662 command_ctx_queue: Default::default(),
663 completing_barrier: None,
664 committed_epoch: Some(committed_epoch),
665 database_info,
666 creating_streaming_job_controls,
667 metrics: DatabaseCheckpointControlMetrics::new(database_id),
668 }
669 }
670
671 pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
672 !self.database_info.contains_worker(worker_id as _)
673 && self
674 .creating_streaming_job_controls
675 .values()
676 .all(|job| job.is_valid_after_worker_err(worker_id))
677 }
678
679 fn total_command_num(&self) -> usize {
680 self.command_ctx_queue.len()
681 + match &self.completing_barrier {
682 Some(_) => 1,
683 None => 0,
684 }
685 }
686
687 fn update_barrier_nums_metrics(&self) {
689 self.metrics.in_flight_barrier_nums.set(
690 self.command_ctx_queue
691 .values()
692 .filter(|x| x.state.is_inflight())
693 .count() as i64,
694 );
695 self.metrics
696 .all_barrier_nums
697 .set(self.total_command_num() as i64);
698 }
699
700 fn enqueue_command(
702 &mut self,
703 command_ctx: CommandContext,
704 notifiers: Vec<Notifier>,
705 creating_jobs_to_wait: HashSet<JobId>,
706 ) {
707 let timer = self.metrics.barrier_latency.start_timer();
708
709 if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
710 assert_eq!(
711 command_ctx.barrier_info.prev_epoch.value(),
712 node.command_ctx.barrier_info.curr_epoch.value()
713 );
714 }
715
716 tracing::trace!(
717 prev_epoch = command_ctx.barrier_info.prev_epoch(),
718 ?creating_jobs_to_wait,
719 "enqueue command"
720 );
721 self.command_ctx_queue.insert(
722 command_ctx.barrier_info.prev_epoch(),
723 EpochNode {
724 enqueue_time: timer,
725 state: BarrierEpochState {
726 is_collected: false,
727 resps: vec![],
728 creating_jobs_to_wait,
729 finished_jobs: HashMap::new(),
730 },
731 command_ctx,
732 notifiers,
733 },
734 );
735 }
736
737 fn barrier_collected(
740 &mut self,
741 partial_graph_id: PartialGraphId,
742 collected_barrier: CollectedBarrier,
743 periodic_barriers: &mut PeriodicBarriers,
744 ) -> MetaResult<()> {
745 let prev_epoch = collected_barrier.epoch.prev;
746 tracing::trace!(
747 prev_epoch,
748 partial_graph_id = %partial_graph_id,
749 "barrier collected"
750 );
751 let (database_id, creating_job_id) = from_partial_graph_id(partial_graph_id);
752 assert_eq!(self.database_id, database_id);
753 match creating_job_id {
754 None => {
755 if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
756 assert!(!node.state.is_collected);
757 node.state.is_collected = true;
758 node.state
759 .resps
760 .extend(collected_barrier.resps.into_values());
761 } else {
762 panic!(
763 "collect barrier on non-existing barrier: {}, {:?}",
764 prev_epoch, collected_barrier
765 );
766 }
767 }
768 Some(creating_job_id) => {
769 let should_merge_to_upstream = self
770 .creating_streaming_job_controls
771 .get_mut(&creating_job_id)
772 .expect("should exist")
773 .collect(collected_barrier);
774 if should_merge_to_upstream {
775 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
776 }
777 }
778 }
779 Ok(())
780 }
781
782 fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
784 self.command_ctx_queue
785 .values()
786 .filter(|x| x.state.is_inflight())
787 .count()
788 < in_flight_barrier_nums
789 }
790}
791
792impl DatabaseCheckpointControl {
793 fn collect_backfill_pinned_upstream_log_epoch(
795 &self,
796 ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
797 self.creating_streaming_job_controls
798 .iter()
799 .map(|(job_id, creating_job)| (*job_id, creating_job.pinned_upstream_log_epoch()))
800 .collect()
801 }
802
803 fn collect_no_shuffle_fragment_relations_for_reschedule_check(
804 &self,
805 ) -> Vec<(FragmentId, FragmentId)> {
806 let mut no_shuffle_relations = Vec::new();
807 for fragment in self.database_info.fragment_infos() {
808 let downstream_fragment_id = fragment.fragment_id;
809 visit_stream_node_cont(&fragment.nodes, |node| {
810 if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
811 && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
812 {
813 no_shuffle_relations.push((merge.upstream_fragment_id, downstream_fragment_id));
814 }
815 true
816 });
817 }
818
819 for creating_job in self.creating_streaming_job_controls.values() {
820 creating_job.collect_no_shuffle_fragment_relations(&mut no_shuffle_relations);
821 }
822 no_shuffle_relations
823 }
824
825 fn collect_reschedule_blocked_jobs_for_creating_jobs_inflight(
826 &self,
827 ) -> MetaResult<HashSet<JobId>> {
828 let mut initial_blocked_fragment_ids = HashSet::new();
829 for creating_job in self.creating_streaming_job_controls.values() {
830 creating_job.collect_reschedule_blocked_fragment_ids(&mut initial_blocked_fragment_ids);
831 }
832
833 let mut blocked_fragment_ids = initial_blocked_fragment_ids.clone();
834 if !initial_blocked_fragment_ids.is_empty() {
835 let no_shuffle_relations =
836 self.collect_no_shuffle_fragment_relations_for_reschedule_check();
837 let (forward_edges, backward_edges) =
838 build_no_shuffle_fragment_graph_edges(no_shuffle_relations);
839 let initial_blocked_fragment_ids: Vec<_> =
840 initial_blocked_fragment_ids.iter().copied().collect();
841 for ensemble in find_no_shuffle_graphs(
842 &initial_blocked_fragment_ids,
843 &forward_edges,
844 &backward_edges,
845 )? {
846 blocked_fragment_ids.extend(ensemble.fragments());
847 }
848 }
849
850 let mut blocked_job_ids = HashSet::new();
851 blocked_job_ids.extend(
852 blocked_fragment_ids
853 .into_iter()
854 .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id)),
855 );
856 Ok(blocked_job_ids)
857 }
858
859 fn collect_reschedule_blocked_job_ids(
860 &self,
861 reschedules: &HashMap<FragmentId, Reschedule>,
862 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
863 blocked_job_ids: &HashSet<JobId>,
864 ) -> HashSet<JobId> {
865 let mut affected_fragment_ids: HashSet<FragmentId> = reschedules.keys().copied().collect();
866 affected_fragment_ids.extend(fragment_actors.keys().copied());
867 for reschedule in reschedules.values() {
868 affected_fragment_ids.extend(reschedule.downstream_fragment_ids.iter().copied());
869 affected_fragment_ids.extend(
870 reschedule
871 .upstream_fragment_dispatcher_ids
872 .iter()
873 .map(|(fragment_id, _)| *fragment_id),
874 );
875 }
876
877 affected_fragment_ids
878 .into_iter()
879 .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id))
880 .filter(|job_id| blocked_job_ids.contains(job_id))
881 .collect()
882 }
883
884 fn next_complete_barrier_task(
885 &mut self,
886 task: &mut Option<CompleteBarrierTask>,
887 mut context: Option<(&mut PeriodicBarriers, &mut PartialGraphManager)>,
888 hummock_version_stats: &HummockVersionStats,
889 ) {
890 let mut creating_jobs_task = vec![];
892 if let Some(committed_epoch) = self.committed_epoch {
893 let mut finished_jobs = Vec::new();
895 let min_upstream_inflight_barrier = self
896 .command_ctx_queue
897 .first_key_value()
898 .map(|(epoch, _)| *epoch);
899 for (job_id, job) in &mut self.creating_streaming_job_controls {
900 if let Some((epoch, resps, status)) =
901 job.start_completing(min_upstream_inflight_barrier, committed_epoch)
902 {
903 let create_info = match status {
904 CompleteJobType::First(info) => Some(info),
905 CompleteJobType::Normal => None,
906 CompleteJobType::Finished => {
907 finished_jobs.push((*job_id, epoch, resps));
908 continue;
909 }
910 };
911 creating_jobs_task.push((*job_id, epoch, resps, create_info));
912 }
913 }
914 if !finished_jobs.is_empty()
915 && let Some((_, partial_graph_manager)) = &mut context
916 {
917 partial_graph_manager.remove_partial_graphs(
918 finished_jobs
919 .iter()
920 .map(|(job_id, _, _)| to_partial_graph_id(self.database_id, Some(*job_id)))
921 .collect(),
922 );
923 }
924 for (job_id, epoch, resps) in finished_jobs {
925 let epoch_state = &mut self
926 .command_ctx_queue
927 .get_mut(&epoch)
928 .expect("should exist")
929 .state;
930 assert!(epoch_state.creating_jobs_to_wait.remove(&job_id));
931 debug!(epoch, %job_id, "finish creating job");
932 let creating_streaming_job = self
935 .creating_streaming_job_controls
936 .remove(&job_id)
937 .expect("should exist");
938 let tracking_job = creating_streaming_job.into_tracking_job();
939
940 assert!(
941 epoch_state
942 .finished_jobs
943 .insert(job_id, (resps, tracking_job))
944 .is_none()
945 );
946 }
947 }
948 assert!(self.completing_barrier.is_none());
949 while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
950 && !state.is_inflight()
951 {
952 {
953 let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
954 assert!(node.state.creating_jobs_to_wait.is_empty());
955 assert!(node.state.is_collected);
956
957 self.handle_refresh_table_info(task, &node);
958
959 self.database_info.apply_collected_command(
960 &node.command_ctx.command,
961 &node.state.resps,
962 hummock_version_stats,
963 );
964 if !node.command_ctx.barrier_info.kind.is_checkpoint() {
965 node.notifiers.into_iter().for_each(|notifier| {
966 notifier.notify_collected();
967 });
968 if let Some((periodic_barriers, _)) = &mut context
969 && self.database_info.has_pending_finished_jobs()
970 && self
971 .command_ctx_queue
972 .values()
973 .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
974 {
975 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
976 }
977 continue;
978 }
979 let mut staging_commit_info = self.database_info.take_staging_commit_info();
980 node.state
981 .finished_jobs
982 .drain()
983 .for_each(|(_job_id, (resps, tracking_job))| {
984 node.state.resps.extend(resps);
985 staging_commit_info.finished_jobs.push(tracking_job);
986 });
987 let task = task.get_or_insert_default();
988 node.command_ctx.collect_commit_epoch_info(
989 &mut task.commit_info,
990 take(&mut node.state.resps),
991 self.collect_backfill_pinned_upstream_log_epoch(),
992 );
993 self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
994 task.finished_jobs.extend(staging_commit_info.finished_jobs);
995 task.finished_cdc_table_backfill
996 .extend(staging_commit_info.finished_cdc_table_backfill);
997 task.notifiers.extend(node.notifiers);
998 task.epoch_infos
999 .try_insert(
1000 self.database_id,
1001 (Some((node.command_ctx, node.enqueue_time)), vec![]),
1002 )
1003 .expect("non duplicate");
1004 task.commit_info
1005 .truncate_tables
1006 .extend(staging_commit_info.table_ids_to_truncate);
1007 break;
1008 }
1009 }
1010 if !creating_jobs_task.is_empty() {
1011 let task = task.get_or_insert_default();
1012 for (job_id, epoch, resps, create_info) in creating_jobs_task {
1013 collect_creating_job_commit_epoch_info(
1014 &mut task.commit_info,
1015 epoch,
1016 resps,
1017 self.creating_streaming_job_controls[&job_id]
1018 .state_table_ids()
1019 .iter()
1020 .copied(),
1021 create_info.as_ref(),
1022 );
1023 let (_, creating_job_epochs) =
1024 task.epoch_infos.entry(self.database_id).or_default();
1025 creating_job_epochs.push((job_id, epoch, create_info));
1026 }
1027 }
1028 }
1029
1030 fn ack_completed(
1031 &mut self,
1032 command_prev_epoch: Option<u64>,
1033 creating_job_epochs: Vec<(JobId, u64)>,
1034 ) {
1035 {
1036 if let Some(prev_epoch) = self.completing_barrier.take() {
1037 assert_eq!(command_prev_epoch, Some(prev_epoch));
1038 self.committed_epoch = Some(prev_epoch);
1039 } else {
1040 assert_eq!(command_prev_epoch, None);
1041 };
1042 for (job_id, epoch) in creating_job_epochs {
1043 self.creating_streaming_job_controls
1044 .get_mut(&job_id)
1045 .expect("should exist")
1046 .ack_completed(epoch)
1047 }
1048 }
1049 }
1050
1051 fn handle_refresh_table_info(&self, task: &mut Option<CompleteBarrierTask>, node: &EpochNode) {
1052 let list_finished_info = node
1053 .state
1054 .resps
1055 .iter()
1056 .flat_map(|resp| resp.list_finished_sources.clone())
1057 .collect::<Vec<_>>();
1058 if !list_finished_info.is_empty() {
1059 let task = task.get_or_insert_default();
1060 task.list_finished_source_ids.extend(list_finished_info);
1061 }
1062
1063 let load_finished_info = node
1064 .state
1065 .resps
1066 .iter()
1067 .flat_map(|resp| resp.load_finished_sources.clone())
1068 .collect::<Vec<_>>();
1069 if !load_finished_info.is_empty() {
1070 let task = task.get_or_insert_default();
1071 task.load_finished_source_ids.extend(load_finished_info);
1072 }
1073
1074 let refresh_finished_table_ids: Vec<JobId> = node
1075 .state
1076 .resps
1077 .iter()
1078 .flat_map(|resp| {
1079 resp.refresh_finished_tables
1080 .iter()
1081 .map(|table_id| table_id.as_job_id())
1082 })
1083 .collect::<Vec<_>>();
1084 if !refresh_finished_table_ids.is_empty() {
1085 let task = task.get_or_insert_default();
1086 task.refresh_finished_table_job_ids
1087 .extend(refresh_finished_table_ids);
1088 }
1089 }
1090}
1091
1092struct EpochNode {
1094 enqueue_time: HistogramTimer,
1096
1097 state: BarrierEpochState,
1099
1100 command_ctx: CommandContext,
1102 notifiers: Vec<Notifier>,
1104}
1105
1106#[derive(Debug)]
1107struct BarrierEpochState {
1109 is_collected: bool,
1110
1111 resps: Vec<BarrierCompleteResponse>,
1112
1113 creating_jobs_to_wait: HashSet<JobId>,
1114
1115 finished_jobs: HashMap<JobId, (Vec<BarrierCompleteResponse>, TrackingJob)>,
1116}
1117
1118impl BarrierEpochState {
1119 fn is_inflight(&self) -> bool {
1120 !self.is_collected || !self.creating_jobs_to_wait.is_empty()
1121 }
1122}
1123
1124impl DatabaseCheckpointControl {
1125 fn handle_new_barrier(
1127 &mut self,
1128 command: Option<(Command, Vec<Notifier>)>,
1129 checkpoint: bool,
1130 span: tracing::Span,
1131 partial_graph_manager: &mut PartialGraphManager,
1132 hummock_version_stats: &HummockVersionStats,
1133 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
1134 worker_nodes: &HashMap<WorkerId, WorkerNode>,
1135 ) -> MetaResult<()> {
1136 let curr_epoch = self.state.in_flight_prev_epoch().next();
1137
1138 let (command, mut notifiers) = if let Some((command, notifiers)) = command {
1139 (Some(command), notifiers)
1140 } else {
1141 (None, vec![])
1142 };
1143
1144 debug_assert!(
1145 !matches!(
1146 &command,
1147 Some(Command::RescheduleIntent {
1148 reschedule_plan: None,
1149 ..
1150 })
1151 ),
1152 "reschedule intent should be resolved before injection"
1153 );
1154
1155 if let Some(Command::DropStreamingJobs {
1156 streaming_job_ids, ..
1157 }) = &command
1158 {
1159 if streaming_job_ids.len() > 1 {
1160 for job_to_cancel in streaming_job_ids {
1161 if self
1162 .creating_streaming_job_controls
1163 .contains_key(job_to_cancel)
1164 {
1165 warn!(
1166 job_id = %job_to_cancel,
1167 "ignore multi-job cancel command on creating snapshot backfill streaming job"
1168 );
1169 for notifier in notifiers {
1170 notifier
1171 .notify_start_failed(anyhow!("cannot cancel creating snapshot backfill streaming job with other jobs, \
1172 the job will continue creating until created or recovery. Please cancel the snapshot backfilling job in a single DDL ").into());
1173 }
1174 return Ok(());
1175 }
1176 }
1177 } else if let Some(job_to_drop) = streaming_job_ids.iter().next()
1178 && let Some(creating_job) =
1179 self.creating_streaming_job_controls.get_mut(job_to_drop)
1180 && creating_job.drop(&mut notifiers, partial_graph_manager)
1181 {
1182 return Ok(());
1183 }
1184 }
1185
1186 if let Some(Command::Throttle { jobs, .. }) = &command
1187 && jobs.len() > 1
1188 && let Some(creating_job_id) = jobs
1189 .iter()
1190 .find(|job| self.creating_streaming_job_controls.contains_key(*job))
1191 {
1192 warn!(
1193 job_id = %creating_job_id,
1194 "ignore multi-job throttle command on creating snapshot backfill streaming job"
1195 );
1196 for notifier in notifiers {
1197 notifier
1198 .notify_start_failed(anyhow!("cannot alter rate limit for snapshot backfill streaming job with other jobs, \
1199 the original rate limit will be kept recovery.").into());
1200 }
1201 return Ok(());
1202 };
1203
1204 if let Some(Command::RescheduleIntent {
1205 reschedule_plan: Some(reschedule_plan),
1206 ..
1207 }) = &command
1208 && !self.creating_streaming_job_controls.is_empty()
1209 {
1210 let blocked_job_ids =
1211 self.collect_reschedule_blocked_jobs_for_creating_jobs_inflight()?;
1212 let blocked_reschedule_job_ids = self.collect_reschedule_blocked_job_ids(
1213 &reschedule_plan.reschedules,
1214 &reschedule_plan.fragment_actors,
1215 &blocked_job_ids,
1216 );
1217 if !blocked_reschedule_job_ids.is_empty() {
1218 warn!(
1219 blocked_reschedule_job_ids = ?blocked_reschedule_job_ids,
1220 "reject reschedule fragments related to creating unreschedulable backfill jobs"
1221 );
1222 for notifier in notifiers {
1223 notifier.notify_start_failed(
1224 anyhow!(
1225 "cannot reschedule jobs {:?} when creating jobs with unreschedulable backfill fragments",
1226 blocked_reschedule_job_ids
1227 )
1228 .into(),
1229 );
1230 }
1231 return Ok(());
1232 }
1233 }
1234
1235 if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1236 && self.database_info.is_empty()
1237 {
1238 assert!(
1239 self.creating_streaming_job_controls.is_empty(),
1240 "should not have snapshot backfill job when there is no normal job in database"
1241 );
1242 for mut notifier in notifiers {
1244 notifier.notify_started();
1245 notifier.notify_collected();
1246 }
1247 return Ok(());
1248 };
1249
1250 if let Some(Command::CreateStreamingJob {
1251 job_type: CreateStreamingJobType::SnapshotBackfill(_),
1252 ..
1253 }) = &command
1254 && self.state.is_paused()
1255 {
1256 warn!("cannot create streaming job with snapshot backfill when paused");
1257 for notifier in notifiers {
1258 notifier.notify_start_failed(
1259 anyhow!("cannot create streaming job with snapshot backfill when paused",)
1260 .into(),
1261 );
1262 }
1263 return Ok(());
1264 }
1265
1266 let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1267 barrier_info.prev_epoch.span().in_scope(|| {
1269 tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch(), "new barrier enqueued");
1270 });
1271 span.record("epoch", barrier_info.curr_epoch());
1272
1273 let ApplyCommandInfo {
1274 mv_subscription_max_retention,
1275 table_ids_to_commit,
1276 jobs_to_wait,
1277 command,
1278 } = match self.apply_command(
1279 command,
1280 &mut notifiers,
1281 &barrier_info,
1282 partial_graph_manager,
1283 hummock_version_stats,
1284 adaptive_parallelism_strategy,
1285 worker_nodes,
1286 ) {
1287 Ok(info) => info,
1288 Err(err) => {
1289 for notifier in notifiers {
1290 notifier.notify_start_failed(err.clone());
1291 }
1292 fail_point!("inject_barrier_err_success");
1293 return Err(err);
1294 }
1295 };
1296
1297 notifiers.iter_mut().for_each(|n| n.notify_started());
1299
1300 let command_ctx = CommandContext::new(
1301 barrier_info,
1302 mv_subscription_max_retention,
1303 table_ids_to_commit,
1304 command,
1305 span,
1306 );
1307
1308 self.enqueue_command(command_ctx, notifiers, jobs_to_wait);
1310
1311 Ok(())
1312 }
1313}