1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_meta_model::WorkerId;
29use risingwave_pb::hummock::HummockVersionStats;
30use risingwave_pb::id::{FragmentId, PartialGraphId};
31use risingwave_pb::stream_plan::DispatcherType as PbDispatcherType;
32use risingwave_pb::stream_plan::stream_node::NodeBody;
33use risingwave_pb::stream_service::BarrierCompleteResponse;
34use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
35use tracing::{debug, warn};
36
37use crate::barrier::cdc_progress::CdcProgress;
38use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
39use crate::barrier::checkpoint::recovery::{
40 DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
41 RecoveringStateAction,
42};
43use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
44use crate::barrier::command::CommandContext;
45use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
46use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
47use crate::barrier::notifier::Notifier;
48use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager};
49use crate::barrier::progress::TrackingJob;
50use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
51use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
52use crate::barrier::utils::collect_creating_job_commit_epoch_info;
53use crate::barrier::{
54 BackfillProgress, Command, CreateStreamingJobType, FragmentBackfillProgress, Reschedule,
55};
56use crate::controller::scale::{build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs};
57use crate::manager::MetaSrvEnv;
58use crate::model::ActorId;
59use crate::rpc::metrics::GLOBAL_META_METRICS;
60use crate::{MetaError, MetaResult};
61
62pub(crate) struct CheckpointControl {
63 pub(crate) env: MetaSrvEnv,
64 pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
65 pub(super) hummock_version_stats: HummockVersionStats,
66 pub(crate) in_flight_barrier_nums: usize,
68}
69
70impl CheckpointControl {
71 pub fn new(env: MetaSrvEnv) -> Self {
72 Self {
73 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
74 env,
75 databases: Default::default(),
76 hummock_version_stats: Default::default(),
77 }
78 }
79
80 pub(crate) fn recover(
81 databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
82 failed_databases: HashMap<DatabaseId, HashSet<PartialGraphId>>, hummock_version_stats: HummockVersionStats,
84 env: MetaSrvEnv,
85 ) -> Self {
86 env.shared_actor_infos()
87 .retain_databases(databases.keys().chain(failed_databases.keys()).cloned());
88 Self {
89 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
90 env,
91 databases: databases
92 .into_iter()
93 .map(|(database_id, control)| {
94 (
95 database_id,
96 DatabaseCheckpointControlStatus::Running(control),
97 )
98 })
99 .chain(failed_databases.into_iter().map(
100 |(database_id, resetting_partial_graphs)| {
101 (
102 database_id,
103 DatabaseCheckpointControlStatus::Recovering(
104 DatabaseRecoveringState::new_resetting(
105 database_id,
106 resetting_partial_graphs,
107 ),
108 ),
109 )
110 },
111 ))
112 .collect(),
113 hummock_version_stats,
114 }
115 }
116
117 pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
118 self.hummock_version_stats = output.hummock_version_stats;
119 for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
120 self.databases
121 .get_mut(&database_id)
122 .expect("should exist")
123 .expect_running("should have wait for completing command before enter recovery")
124 .ack_completed(command_prev_epoch, creating_job_epochs);
125 }
126 }
127
128 pub(crate) fn next_complete_barrier_task(
129 &mut self,
130 mut context: Option<(&mut PeriodicBarriers, &mut PartialGraphManager)>,
131 ) -> Option<CompleteBarrierTask> {
132 let mut task = None;
133 for database in self.databases.values_mut() {
134 let Some(database) = database.running_state_mut() else {
135 continue;
136 };
137 let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
138 database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
139 }
140 task
141 }
142
143 pub(crate) fn barrier_collected(
144 &mut self,
145 partial_graph_id: PartialGraphId,
146 collected_barrier: CollectedBarrier,
147 periodic_barriers: &mut PeriodicBarriers,
148 ) -> MetaResult<()> {
149 let (database_id, _) = from_partial_graph_id(partial_graph_id);
150 let database_status = self.databases.get_mut(&database_id).expect("should exist");
151 match database_status {
152 DatabaseCheckpointControlStatus::Running(database) => {
153 database.barrier_collected(partial_graph_id, collected_barrier, periodic_barriers)
154 }
155 DatabaseCheckpointControlStatus::Recovering(_) => {
156 if cfg!(debug_assertions) {
157 panic!(
158 "receive collected barrier {:?} on recovering database {} from partial graph {}",
159 collected_barrier, database_id, partial_graph_id
160 );
161 } else {
162 warn!(?collected_barrier, %partial_graph_id, "ignore collected barrier on recovering database");
163 }
164 Ok(())
165 }
166 }
167 }
168
169 pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
170 self.databases.iter().filter_map(|(database_id, database)| {
171 database.running_state().is_none().then_some(*database_id)
172 })
173 }
174
175 pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
176 self.databases.iter().filter_map(|(database_id, database)| {
177 database.running_state().is_some().then_some(*database_id)
178 })
179 }
180
181 pub(crate) fn database_info(&self, database_id: DatabaseId) -> Option<&InflightDatabaseInfo> {
182 self.databases
183 .get(&database_id)
184 .and_then(|database| database.running_state())
185 .map(|database| &database.database_info)
186 }
187
188 pub(crate) fn may_have_snapshot_backfilling_jobs(&self) -> bool {
189 self.databases
190 .values()
191 .any(|database| database.may_have_snapshot_backfilling_jobs())
192 }
193
194 pub(crate) fn handle_new_barrier(
196 &mut self,
197 new_barrier: NewBarrier,
198 partial_graph_manager: &mut PartialGraphManager,
199 ) -> MetaResult<()> {
200 let NewBarrier {
201 database_id,
202 command,
203 span,
204 checkpoint,
205 } = new_barrier;
206
207 if let Some((mut command, notifiers)) = command {
208 if let &mut Command::CreateStreamingJob {
209 ref mut cross_db_snapshot_backfill_info,
210 ref info,
211 ..
212 } = &mut command
213 {
214 for (table_id, snapshot_epoch) in
215 &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
216 {
217 for database in self.databases.values() {
218 if let Some(database) = database.running_state()
219 && database.database_info.contains_job(table_id.as_job_id())
220 {
221 if let Some(committed_epoch) = database.committed_epoch {
222 *snapshot_epoch = Some(committed_epoch);
223 }
224 break;
225 }
226 }
227 if snapshot_epoch.is_none() {
228 let table_id = *table_id;
229 warn!(
230 ?cross_db_snapshot_backfill_info,
231 ?table_id,
232 ?info,
233 "database of cross db upstream table not found"
234 );
235 let err: MetaError =
236 anyhow!("database of cross db upstream table {} not found", table_id)
237 .into();
238 for notifier in notifiers {
239 notifier.notify_start_failed(err.clone());
240 }
241
242 return Ok(());
243 }
244 }
245 }
246
247 let database = match self.databases.entry(database_id) {
248 Entry::Occupied(entry) => entry
249 .into_mut()
250 .expect_running("should not have command when not running"),
251 Entry::Vacant(entry) => match &command {
252 Command::CreateStreamingJob { info, job_type, .. } => {
253 let CreateStreamingJobType::Normal = job_type else {
254 if cfg!(debug_assertions) {
255 panic!(
256 "unexpected first job of type {job_type:?} with info {info:?}"
257 );
258 } else {
259 for notifier in notifiers {
260 notifier.notify_start_failed(anyhow!("unexpected job_type {job_type:?} for first job {} in database {database_id}", info.streaming_job.id()).into());
261 }
262 return Ok(());
263 }
264 };
265 let new_database = DatabaseCheckpointControl::new(
266 database_id,
267 self.env.shared_actor_infos().clone(),
268 );
269 let adder = partial_graph_manager
270 .add_partial_graph(to_partial_graph_id(database_id, None));
271 adder.added();
272 entry
273 .insert(DatabaseCheckpointControlStatus::Running(new_database))
274 .expect_running("just initialized as running")
275 }
276 Command::Flush
277 | Command::Pause
278 | Command::Resume
279 | Command::DropStreamingJobs { .. }
280 | Command::DropSubscription { .. } => {
281 for mut notifier in notifiers {
282 notifier.notify_started();
283 notifier.notify_collected();
284 }
285 warn!(?command, "skip command for empty database");
286 return Ok(());
287 }
288 Command::RescheduleIntent { .. }
289 | Command::ReplaceStreamJob(_)
290 | Command::SourceChangeSplit(_)
291 | Command::Throttle { .. }
292 | Command::CreateSubscription { .. }
293 | Command::AlterSubscriptionRetention { .. }
294 | Command::ConnectorPropsChange(_)
295 | Command::Refresh { .. }
296 | Command::ListFinish { .. }
297 | Command::LoadFinish { .. }
298 | Command::ResetSource { .. }
299 | Command::ResumeBackfill { .. }
300 | Command::InjectSourceOffsets { .. } => {
301 if cfg!(debug_assertions) {
302 panic!(
303 "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
304 database_id, command
305 )
306 } else {
307 warn!(%database_id, ?command, "database not exist when handling command");
308 for notifier in notifiers {
309 notifier.notify_start_failed(anyhow!("database {database_id} not exist when handling command {command:?}").into());
310 }
311 return Ok(());
312 }
313 }
314 },
315 };
316
317 database.handle_new_barrier(
318 Some((command, notifiers)),
319 checkpoint,
320 span,
321 partial_graph_manager,
322 &self.hummock_version_stats,
323 )
324 } else {
325 let database = match self.databases.entry(database_id) {
326 Entry::Occupied(entry) => entry.into_mut(),
327 Entry::Vacant(_) => {
328 return Ok(());
331 }
332 };
333 let Some(database) = database.running_state_mut() else {
334 return Ok(());
336 };
337 if !database.can_inject_barrier(self.in_flight_barrier_nums) {
338 return Ok(());
340 }
341 database.handle_new_barrier(
342 None,
343 checkpoint,
344 span,
345 partial_graph_manager,
346 &self.hummock_version_stats,
347 )
348 }
349 }
350
351 pub(crate) fn update_barrier_nums_metrics(&self) {
352 self.databases
353 .values()
354 .flat_map(|database| database.running_state())
355 .for_each(|database| database.update_barrier_nums_metrics());
356 }
357
358 pub(crate) fn gen_backfill_progress(&self) -> HashMap<JobId, BackfillProgress> {
359 let mut progress = HashMap::new();
360 for status in self.databases.values() {
361 let Some(database_checkpoint_control) = status.running_state() else {
362 continue;
363 };
364 progress.extend(
366 database_checkpoint_control
367 .database_info
368 .gen_backfill_progress(),
369 );
370 for (job_id, creating_job) in
372 &database_checkpoint_control.creating_streaming_job_controls
373 {
374 progress.extend([(*job_id, creating_job.gen_backfill_progress())]);
375 }
376 }
377 progress
378 }
379
380 pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
381 let mut progress = Vec::new();
382 for status in self.databases.values() {
383 let Some(database_checkpoint_control) = status.running_state() else {
384 continue;
385 };
386 progress.extend(
387 database_checkpoint_control
388 .database_info
389 .gen_fragment_backfill_progress(),
390 );
391 for creating_job in database_checkpoint_control
392 .creating_streaming_job_controls
393 .values()
394 {
395 progress.extend(creating_job.gen_fragment_backfill_progress());
396 }
397 }
398 progress
399 }
400
401 pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
402 let mut progress = HashMap::new();
403 for status in self.databases.values() {
404 let Some(database_checkpoint_control) = status.running_state() else {
405 continue;
406 };
407 progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
409 }
410 progress
411 }
412
413 pub(crate) fn databases_failed_at_worker_err(
414 &mut self,
415 worker_id: WorkerId,
416 ) -> impl Iterator<Item = DatabaseId> + '_ {
417 self.databases
418 .iter_mut()
419 .filter_map(
420 move |(database_id, database_status)| match database_status {
421 DatabaseCheckpointControlStatus::Running(control) => {
422 if !control.is_valid_after_worker_err(worker_id) {
423 Some(*database_id)
424 } else {
425 None
426 }
427 }
428 DatabaseCheckpointControlStatus::Recovering(state) => {
429 if !state.is_valid_after_worker_err(worker_id) {
430 Some(*database_id)
431 } else {
432 None
433 }
434 }
435 },
436 )
437 }
438
439 pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
440 for (_, node) in self.databases.values_mut().flat_map(|status| {
441 status
442 .running_state_mut()
443 .map(|database| take(&mut database.command_ctx_queue))
444 .into_iter()
445 .flatten()
446 }) {
447 for notifier in node.notifiers {
448 notifier.notify_collection_failed(err.clone());
449 }
450 node.enqueue_time.observe_duration();
451 }
452 }
453}
454
455pub(crate) enum CheckpointControlEvent<'a> {
456 EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
457 EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
458}
459
460impl CheckpointControl {
461 pub(crate) fn on_partial_graph_reset(
462 &mut self,
463 partial_graph_id: PartialGraphId,
464 reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
465 ) {
466 let (database_id, creating_job) = from_partial_graph_id(partial_graph_id);
467 match self.databases.get_mut(&database_id).expect("should exist") {
468 DatabaseCheckpointControlStatus::Running(database) => {
469 if let Some(creating_job_id) = creating_job {
470 let Some(job) = database
471 .creating_streaming_job_controls
472 .remove(&creating_job_id)
473 else {
474 if cfg!(debug_assertions) {
475 panic!(
476 "receive reset partial graph resp on non-existing creating job {creating_job_id} in database {database_id}"
477 )
478 }
479 warn!(
480 %database_id,
481 %creating_job_id,
482 "ignore reset partial graph resp on non-existing creating job on running database"
483 );
484 return;
485 };
486 job.on_partial_graph_reset();
487 } else {
488 unreachable!("should not receive reset database resp when database running")
489 }
490 }
491 DatabaseCheckpointControlStatus::Recovering(state) => {
492 state.on_partial_graph_reset(partial_graph_id, reset_resps);
493 }
494 }
495 }
496
497 pub(crate) fn on_partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
498 let (database_id, _) = from_partial_graph_id(partial_graph_id);
499 match self.databases.get_mut(&database_id).expect("should exist") {
500 DatabaseCheckpointControlStatus::Running(_) => {
501 unreachable!("should not have partial graph initialized when running")
502 }
503 DatabaseCheckpointControlStatus::Recovering(state) => {
504 state.partial_graph_initialized(partial_graph_id);
505 }
506 }
507 }
508
509 pub(crate) fn next_event(
510 &mut self,
511 ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
512 let mut this = Some(self);
513 poll_fn(move |cx| {
514 let Some(this_mut) = this.as_mut() else {
515 unreachable!("should not be polled after poll ready")
516 };
517 for (&database_id, database_status) in &mut this_mut.databases {
518 match database_status {
519 DatabaseCheckpointControlStatus::Running(_) => {}
520 DatabaseCheckpointControlStatus::Recovering(state) => {
521 let poll_result = state.poll_next_event(cx);
522 if let Poll::Ready(action) = poll_result {
523 let this = this.take().expect("checked Some");
524 return Poll::Ready(match action {
525 RecoveringStateAction::EnterInitializing(reset_workers) => {
526 CheckpointControlEvent::EnteringInitializing(
527 this.new_database_status_action(
528 database_id,
529 EnterInitializing(reset_workers),
530 ),
531 )
532 }
533 RecoveringStateAction::EnterRunning => {
534 CheckpointControlEvent::EnteringRunning(
535 this.new_database_status_action(database_id, EnterRunning),
536 )
537 }
538 });
539 }
540 }
541 }
542 }
543 Poll::Pending
544 })
545 }
546}
547
548pub(crate) enum DatabaseCheckpointControlStatus {
549 Running(DatabaseCheckpointControl),
550 Recovering(DatabaseRecoveringState),
551}
552
553impl DatabaseCheckpointControlStatus {
554 fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
555 match self {
556 DatabaseCheckpointControlStatus::Running(state) => Some(state),
557 DatabaseCheckpointControlStatus::Recovering(_) => None,
558 }
559 }
560
561 fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
562 match self {
563 DatabaseCheckpointControlStatus::Running(state) => Some(state),
564 DatabaseCheckpointControlStatus::Recovering(_) => None,
565 }
566 }
567
568 fn may_have_snapshot_backfilling_jobs(&self) -> bool {
569 self.running_state()
570 .map(|database| !database.creating_streaming_job_controls.is_empty())
571 .unwrap_or(true) }
573
574 fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
575 match self {
576 DatabaseCheckpointControlStatus::Running(state) => state,
577 DatabaseCheckpointControlStatus::Recovering(_) => {
578 panic!("should be at running: {}", reason)
579 }
580 }
581 }
582}
583
584struct DatabaseCheckpointControlMetrics {
585 barrier_latency: LabelGuardedHistogram,
586 in_flight_barrier_nums: LabelGuardedIntGauge,
587 all_barrier_nums: LabelGuardedIntGauge,
588}
589
590impl DatabaseCheckpointControlMetrics {
591 fn new(database_id: DatabaseId) -> Self {
592 let database_id_str = database_id.to_string();
593 let barrier_latency = GLOBAL_META_METRICS
594 .barrier_latency
595 .with_guarded_label_values(&[&database_id_str]);
596 let in_flight_barrier_nums = GLOBAL_META_METRICS
597 .in_flight_barrier_nums
598 .with_guarded_label_values(&[&database_id_str]);
599 let all_barrier_nums = GLOBAL_META_METRICS
600 .all_barrier_nums
601 .with_guarded_label_values(&[&database_id_str]);
602 Self {
603 barrier_latency,
604 in_flight_barrier_nums,
605 all_barrier_nums,
606 }
607 }
608}
609
610pub(in crate::barrier) struct DatabaseCheckpointControl {
612 pub(super) database_id: DatabaseId,
613 pub(super) state: BarrierWorkerState,
614
615 command_ctx_queue: BTreeMap<u64, EpochNode>,
618 completing_barrier: Option<u64>,
621
622 committed_epoch: Option<u64>,
623
624 pub(super) database_info: InflightDatabaseInfo,
625 pub creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
626
627 metrics: DatabaseCheckpointControlMetrics,
628}
629
630impl DatabaseCheckpointControl {
631 fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
632 Self {
633 database_id,
634 state: BarrierWorkerState::new(),
635 command_ctx_queue: Default::default(),
636 completing_barrier: None,
637 committed_epoch: None,
638 database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
639 creating_streaming_job_controls: Default::default(),
640 metrics: DatabaseCheckpointControlMetrics::new(database_id),
641 }
642 }
643
644 pub(crate) fn recovery(
645 database_id: DatabaseId,
646 state: BarrierWorkerState,
647 committed_epoch: u64,
648 database_info: InflightDatabaseInfo,
649 creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
650 ) -> Self {
651 Self {
652 database_id,
653 state,
654 command_ctx_queue: Default::default(),
655 completing_barrier: None,
656 committed_epoch: Some(committed_epoch),
657 database_info,
658 creating_streaming_job_controls,
659 metrics: DatabaseCheckpointControlMetrics::new(database_id),
660 }
661 }
662
663 pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
664 !self.database_info.contains_worker(worker_id as _)
665 && self
666 .creating_streaming_job_controls
667 .values()
668 .all(|job| job.is_valid_after_worker_err(worker_id))
669 }
670
671 fn total_command_num(&self) -> usize {
672 self.command_ctx_queue.len()
673 + match &self.completing_barrier {
674 Some(_) => 1,
675 None => 0,
676 }
677 }
678
679 fn update_barrier_nums_metrics(&self) {
681 self.metrics.in_flight_barrier_nums.set(
682 self.command_ctx_queue
683 .values()
684 .filter(|x| x.state.is_inflight())
685 .count() as i64,
686 );
687 self.metrics
688 .all_barrier_nums
689 .set(self.total_command_num() as i64);
690 }
691
692 fn enqueue_command(
694 &mut self,
695 command_ctx: CommandContext,
696 notifiers: Vec<Notifier>,
697 creating_jobs_to_wait: HashSet<JobId>,
698 ) {
699 let timer = self.metrics.barrier_latency.start_timer();
700
701 if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
702 assert_eq!(
703 command_ctx.barrier_info.prev_epoch.value(),
704 node.command_ctx.barrier_info.curr_epoch.value()
705 );
706 }
707
708 tracing::trace!(
709 prev_epoch = command_ctx.barrier_info.prev_epoch(),
710 ?creating_jobs_to_wait,
711 "enqueue command"
712 );
713 self.command_ctx_queue.insert(
714 command_ctx.barrier_info.prev_epoch(),
715 EpochNode {
716 enqueue_time: timer,
717 state: BarrierEpochState {
718 is_collected: false,
719 resps: vec![],
720 creating_jobs_to_wait,
721 finished_jobs: HashMap::new(),
722 },
723 command_ctx,
724 notifiers,
725 },
726 );
727 }
728
729 fn barrier_collected(
732 &mut self,
733 partial_graph_id: PartialGraphId,
734 collected_barrier: CollectedBarrier,
735 periodic_barriers: &mut PeriodicBarriers,
736 ) -> MetaResult<()> {
737 let prev_epoch = collected_barrier.epoch.prev;
738 tracing::trace!(
739 prev_epoch,
740 partial_graph_id = %partial_graph_id,
741 "barrier collected"
742 );
743 let (database_id, creating_job_id) = from_partial_graph_id(partial_graph_id);
744 assert_eq!(self.database_id, database_id);
745 match creating_job_id {
746 None => {
747 if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
748 assert!(!node.state.is_collected);
749 node.state.is_collected = true;
750 node.state
751 .resps
752 .extend(collected_barrier.resps.into_values());
753 } else {
754 panic!(
755 "collect barrier on non-existing barrier: {}, {:?}",
756 prev_epoch, collected_barrier
757 );
758 }
759 }
760 Some(creating_job_id) => {
761 let should_merge_to_upstream = self
762 .creating_streaming_job_controls
763 .get_mut(&creating_job_id)
764 .expect("should exist")
765 .collect(collected_barrier);
766 if should_merge_to_upstream {
767 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
768 }
769 }
770 }
771 Ok(())
772 }
773
774 fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
776 self.command_ctx_queue
777 .values()
778 .filter(|x| x.state.is_inflight())
779 .count()
780 < in_flight_barrier_nums
781 }
782}
783
784impl DatabaseCheckpointControl {
785 fn collect_backfill_pinned_upstream_log_epoch(
787 &self,
788 ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
789 self.creating_streaming_job_controls
790 .iter()
791 .map(|(job_id, creating_job)| (*job_id, creating_job.pinned_upstream_log_epoch()))
792 .collect()
793 }
794
795 fn collect_no_shuffle_fragment_relations_for_reschedule_check(
796 &self,
797 ) -> Vec<(FragmentId, FragmentId)> {
798 let mut no_shuffle_relations = Vec::new();
799 for fragment in self.database_info.fragment_infos() {
800 let downstream_fragment_id = fragment.fragment_id;
801 visit_stream_node_cont(&fragment.nodes, |node| {
802 if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
803 && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
804 {
805 no_shuffle_relations.push((merge.upstream_fragment_id, downstream_fragment_id));
806 }
807 true
808 });
809 }
810
811 for creating_job in self.creating_streaming_job_controls.values() {
812 creating_job.collect_no_shuffle_fragment_relations(&mut no_shuffle_relations);
813 }
814 no_shuffle_relations
815 }
816
817 fn collect_reschedule_blocked_jobs_for_creating_jobs_inflight(
818 &self,
819 ) -> MetaResult<HashSet<JobId>> {
820 let mut initial_blocked_fragment_ids = HashSet::new();
821 for creating_job in self.creating_streaming_job_controls.values() {
822 creating_job.collect_reschedule_blocked_fragment_ids(&mut initial_blocked_fragment_ids);
823 }
824
825 let mut blocked_fragment_ids = initial_blocked_fragment_ids.clone();
826 if !initial_blocked_fragment_ids.is_empty() {
827 let no_shuffle_relations =
828 self.collect_no_shuffle_fragment_relations_for_reschedule_check();
829 let (forward_edges, backward_edges) =
830 build_no_shuffle_fragment_graph_edges(no_shuffle_relations);
831 let initial_blocked_fragment_ids: Vec<_> =
832 initial_blocked_fragment_ids.iter().copied().collect();
833 for ensemble in find_no_shuffle_graphs(
834 &initial_blocked_fragment_ids,
835 &forward_edges,
836 &backward_edges,
837 )? {
838 blocked_fragment_ids.extend(ensemble.fragments());
839 }
840 }
841
842 let mut blocked_job_ids = HashSet::new();
843 blocked_job_ids.extend(
844 blocked_fragment_ids
845 .into_iter()
846 .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id)),
847 );
848 Ok(blocked_job_ids)
849 }
850
851 fn collect_reschedule_blocked_job_ids(
852 &self,
853 reschedules: &HashMap<FragmentId, Reschedule>,
854 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
855 blocked_job_ids: &HashSet<JobId>,
856 ) -> HashSet<JobId> {
857 let mut affected_fragment_ids: HashSet<FragmentId> = reschedules.keys().copied().collect();
858 affected_fragment_ids.extend(fragment_actors.keys().copied());
859 for reschedule in reschedules.values() {
860 affected_fragment_ids.extend(reschedule.downstream_fragment_ids.iter().copied());
861 affected_fragment_ids.extend(
862 reschedule
863 .upstream_fragment_dispatcher_ids
864 .iter()
865 .map(|(fragment_id, _)| *fragment_id),
866 );
867 }
868
869 affected_fragment_ids
870 .into_iter()
871 .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id))
872 .filter(|job_id| blocked_job_ids.contains(job_id))
873 .collect()
874 }
875
876 fn next_complete_barrier_task(
877 &mut self,
878 task: &mut Option<CompleteBarrierTask>,
879 mut context: Option<(&mut PeriodicBarriers, &mut PartialGraphManager)>,
880 hummock_version_stats: &HummockVersionStats,
881 ) {
882 let mut creating_jobs_task = vec![];
884 if let Some(committed_epoch) = self.committed_epoch {
885 let mut finished_jobs = Vec::new();
887 let min_upstream_inflight_barrier = self
888 .command_ctx_queue
889 .first_key_value()
890 .map(|(epoch, _)| *epoch);
891 for (job_id, job) in &mut self.creating_streaming_job_controls {
892 if let Some((epoch, resps, status)) =
893 job.start_completing(min_upstream_inflight_barrier, committed_epoch)
894 {
895 let create_info = match status {
896 CompleteJobType::First(info) => Some(info),
897 CompleteJobType::Normal => None,
898 CompleteJobType::Finished => {
899 finished_jobs.push((*job_id, epoch, resps));
900 continue;
901 }
902 };
903 creating_jobs_task.push((*job_id, epoch, resps, create_info));
904 }
905 }
906 if !finished_jobs.is_empty()
907 && let Some((_, partial_graph_manager)) = &mut context
908 {
909 partial_graph_manager.remove_partial_graphs(
910 finished_jobs
911 .iter()
912 .map(|(job_id, _, _)| to_partial_graph_id(self.database_id, Some(*job_id)))
913 .collect(),
914 );
915 }
916 for (job_id, epoch, resps) in finished_jobs {
917 let epoch_state = &mut self
918 .command_ctx_queue
919 .get_mut(&epoch)
920 .expect("should exist")
921 .state;
922 assert!(epoch_state.creating_jobs_to_wait.remove(&job_id));
923 debug!(epoch, %job_id, "finish creating job");
924 let creating_streaming_job = self
927 .creating_streaming_job_controls
928 .remove(&job_id)
929 .expect("should exist");
930 let tracking_job = creating_streaming_job.into_tracking_job();
931
932 assert!(
933 epoch_state
934 .finished_jobs
935 .insert(job_id, (resps, tracking_job))
936 .is_none()
937 );
938 }
939 }
940 assert!(self.completing_barrier.is_none());
941 while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
942 && !state.is_inflight()
943 {
944 {
945 let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
946 assert!(node.state.creating_jobs_to_wait.is_empty());
947 assert!(node.state.is_collected);
948
949 self.handle_refresh_table_info(task, &node);
950
951 self.database_info.apply_collected_command(
952 &node.command_ctx.command,
953 &node.state.resps,
954 hummock_version_stats,
955 );
956 if !node.command_ctx.barrier_info.kind.is_checkpoint() {
957 node.notifiers.into_iter().for_each(|notifier| {
958 notifier.notify_collected();
959 });
960 if let Some((periodic_barriers, _)) = &mut context
961 && self.database_info.has_pending_finished_jobs()
962 && self
963 .command_ctx_queue
964 .values()
965 .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
966 {
967 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
968 }
969 continue;
970 }
971 let mut staging_commit_info = self.database_info.take_staging_commit_info();
972 node.state
973 .finished_jobs
974 .drain()
975 .for_each(|(_job_id, (resps, tracking_job))| {
976 node.state.resps.extend(resps);
977 staging_commit_info.finished_jobs.push(tracking_job);
978 });
979 let task = task.get_or_insert_default();
980 node.command_ctx.collect_commit_epoch_info(
981 &mut task.commit_info,
982 take(&mut node.state.resps),
983 self.collect_backfill_pinned_upstream_log_epoch(),
984 );
985 self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
986 task.finished_jobs.extend(staging_commit_info.finished_jobs);
987 task.finished_cdc_table_backfill
988 .extend(staging_commit_info.finished_cdc_table_backfill);
989 task.notifiers.extend(node.notifiers);
990 task.epoch_infos
991 .try_insert(
992 self.database_id,
993 (Some((node.command_ctx, node.enqueue_time)), vec![]),
994 )
995 .expect("non duplicate");
996 task.commit_info
997 .truncate_tables
998 .extend(staging_commit_info.table_ids_to_truncate);
999 break;
1000 }
1001 }
1002 if !creating_jobs_task.is_empty() {
1003 let task = task.get_or_insert_default();
1004 for (job_id, epoch, resps, create_info) in creating_jobs_task {
1005 collect_creating_job_commit_epoch_info(
1006 &mut task.commit_info,
1007 epoch,
1008 resps,
1009 self.creating_streaming_job_controls[&job_id]
1010 .state_table_ids()
1011 .iter()
1012 .copied(),
1013 create_info.as_ref(),
1014 );
1015 let (_, creating_job_epochs) =
1016 task.epoch_infos.entry(self.database_id).or_default();
1017 creating_job_epochs.push((job_id, epoch, create_info));
1018 }
1019 }
1020 }
1021
1022 fn ack_completed(
1023 &mut self,
1024 command_prev_epoch: Option<u64>,
1025 creating_job_epochs: Vec<(JobId, u64)>,
1026 ) {
1027 {
1028 if let Some(prev_epoch) = self.completing_barrier.take() {
1029 assert_eq!(command_prev_epoch, Some(prev_epoch));
1030 self.committed_epoch = Some(prev_epoch);
1031 } else {
1032 assert_eq!(command_prev_epoch, None);
1033 };
1034 for (job_id, epoch) in creating_job_epochs {
1035 self.creating_streaming_job_controls
1036 .get_mut(&job_id)
1037 .expect("should exist")
1038 .ack_completed(epoch)
1039 }
1040 }
1041 }
1042
1043 fn handle_refresh_table_info(&self, task: &mut Option<CompleteBarrierTask>, node: &EpochNode) {
1044 let list_finished_info = node
1045 .state
1046 .resps
1047 .iter()
1048 .flat_map(|resp| resp.list_finished_sources.clone())
1049 .collect::<Vec<_>>();
1050 if !list_finished_info.is_empty() {
1051 let task = task.get_or_insert_default();
1052 task.list_finished_source_ids.extend(list_finished_info);
1053 }
1054
1055 let load_finished_info = node
1056 .state
1057 .resps
1058 .iter()
1059 .flat_map(|resp| resp.load_finished_sources.clone())
1060 .collect::<Vec<_>>();
1061 if !load_finished_info.is_empty() {
1062 let task = task.get_or_insert_default();
1063 task.load_finished_source_ids.extend(load_finished_info);
1064 }
1065
1066 let refresh_finished_table_ids: Vec<JobId> = node
1067 .state
1068 .resps
1069 .iter()
1070 .flat_map(|resp| {
1071 resp.refresh_finished_tables
1072 .iter()
1073 .map(|table_id| table_id.as_job_id())
1074 })
1075 .collect::<Vec<_>>();
1076 if !refresh_finished_table_ids.is_empty() {
1077 let task = task.get_or_insert_default();
1078 task.refresh_finished_table_job_ids
1079 .extend(refresh_finished_table_ids);
1080 }
1081 }
1082}
1083
1084struct EpochNode {
1086 enqueue_time: HistogramTimer,
1088
1089 state: BarrierEpochState,
1091
1092 command_ctx: CommandContext,
1094 notifiers: Vec<Notifier>,
1096}
1097
1098#[derive(Debug)]
1099struct BarrierEpochState {
1101 is_collected: bool,
1102
1103 resps: Vec<BarrierCompleteResponse>,
1104
1105 creating_jobs_to_wait: HashSet<JobId>,
1106
1107 finished_jobs: HashMap<JobId, (Vec<BarrierCompleteResponse>, TrackingJob)>,
1108}
1109
1110impl BarrierEpochState {
1111 fn is_inflight(&self) -> bool {
1112 !self.is_collected || !self.creating_jobs_to_wait.is_empty()
1113 }
1114}
1115
1116impl DatabaseCheckpointControl {
1117 fn handle_new_barrier(
1119 &mut self,
1120 command: Option<(Command, Vec<Notifier>)>,
1121 checkpoint: bool,
1122 span: tracing::Span,
1123 partial_graph_manager: &mut PartialGraphManager,
1124 hummock_version_stats: &HummockVersionStats,
1125 ) -> MetaResult<()> {
1126 let curr_epoch = self.state.in_flight_prev_epoch().next();
1127
1128 let (command, mut notifiers) = if let Some((command, notifiers)) = command {
1129 (Some(command), notifiers)
1130 } else {
1131 (None, vec![])
1132 };
1133
1134 debug_assert!(
1135 !matches!(
1136 &command,
1137 Some(Command::RescheduleIntent {
1138 reschedule_plan: None,
1139 ..
1140 })
1141 ),
1142 "reschedule intent should be resolved before injection"
1143 );
1144
1145 if let Some(Command::DropStreamingJobs {
1146 streaming_job_ids, ..
1147 }) = &command
1148 {
1149 if streaming_job_ids.len() > 1 {
1150 for job_to_cancel in streaming_job_ids {
1151 if self
1152 .creating_streaming_job_controls
1153 .contains_key(job_to_cancel)
1154 {
1155 warn!(
1156 job_id = %job_to_cancel,
1157 "ignore multi-job cancel command on creating snapshot backfill streaming job"
1158 );
1159 for notifier in notifiers {
1160 notifier
1161 .notify_start_failed(anyhow!("cannot cancel creating snapshot backfill streaming job with other jobs, \
1162 the job will continue creating until created or recovery. Please cancel the snapshot backfilling job in a single DDL ").into());
1163 }
1164 return Ok(());
1165 }
1166 }
1167 } else if let Some(job_to_drop) = streaming_job_ids.iter().next()
1168 && let Some(creating_job) =
1169 self.creating_streaming_job_controls.get_mut(job_to_drop)
1170 && creating_job.drop(&mut notifiers, partial_graph_manager)
1171 {
1172 return Ok(());
1173 }
1174 }
1175
1176 if let Some(Command::Throttle { jobs, .. }) = &command
1177 && jobs.len() > 1
1178 && let Some(creating_job_id) = jobs
1179 .iter()
1180 .find(|job| self.creating_streaming_job_controls.contains_key(*job))
1181 {
1182 warn!(
1183 job_id = %creating_job_id,
1184 "ignore multi-job throttle command on creating snapshot backfill streaming job"
1185 );
1186 for notifier in notifiers {
1187 notifier
1188 .notify_start_failed(anyhow!("cannot alter rate limit for snapshot backfill streaming job with other jobs, \
1189 the original rate limit will be kept recovery.").into());
1190 }
1191 return Ok(());
1192 };
1193
1194 if let Some(Command::RescheduleIntent {
1195 reschedule_plan: Some(reschedule_plan),
1196 ..
1197 }) = &command
1198 && !self.creating_streaming_job_controls.is_empty()
1199 {
1200 let blocked_job_ids =
1201 self.collect_reschedule_blocked_jobs_for_creating_jobs_inflight()?;
1202 let blocked_reschedule_job_ids = self.collect_reschedule_blocked_job_ids(
1203 &reschedule_plan.reschedules,
1204 &reschedule_plan.fragment_actors,
1205 &blocked_job_ids,
1206 );
1207 if !blocked_reschedule_job_ids.is_empty() {
1208 warn!(
1209 blocked_reschedule_job_ids = ?blocked_reschedule_job_ids,
1210 "reject reschedule fragments related to creating unreschedulable backfill jobs"
1211 );
1212 for notifier in notifiers {
1213 notifier.notify_start_failed(
1214 anyhow!(
1215 "cannot reschedule jobs {:?} when creating jobs with unreschedulable backfill fragments",
1216 blocked_reschedule_job_ids
1217 )
1218 .into(),
1219 );
1220 }
1221 return Ok(());
1222 }
1223 }
1224
1225 if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1226 && self.database_info.is_empty()
1227 {
1228 assert!(
1229 self.creating_streaming_job_controls.is_empty(),
1230 "should not have snapshot backfill job when there is no normal job in database"
1231 );
1232 for mut notifier in notifiers {
1234 notifier.notify_started();
1235 notifier.notify_collected();
1236 }
1237 return Ok(());
1238 };
1239
1240 if let Some(Command::CreateStreamingJob {
1241 job_type: CreateStreamingJobType::SnapshotBackfill(_),
1242 ..
1243 }) = &command
1244 && self.state.is_paused()
1245 {
1246 warn!("cannot create streaming job with snapshot backfill when paused");
1247 for notifier in notifiers {
1248 notifier.notify_start_failed(
1249 anyhow!("cannot create streaming job with snapshot backfill when paused",)
1250 .into(),
1251 );
1252 }
1253 return Ok(());
1254 }
1255
1256 let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1257 barrier_info.prev_epoch.span().in_scope(|| {
1259 tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch(), "new barrier enqueued");
1260 });
1261 span.record("epoch", barrier_info.curr_epoch());
1262
1263 let ApplyCommandInfo {
1264 mv_subscription_max_retention,
1265 table_ids_to_commit,
1266 jobs_to_wait,
1267 command,
1268 } = match self.apply_command(
1269 command,
1270 &mut notifiers,
1271 &barrier_info,
1272 partial_graph_manager,
1273 hummock_version_stats,
1274 ) {
1275 Ok(info) => info,
1276 Err(err) => {
1277 for notifier in notifiers {
1278 notifier.notify_start_failed(err.clone());
1279 }
1280 fail_point!("inject_barrier_err_success");
1281 return Err(err);
1282 }
1283 };
1284
1285 notifiers.iter_mut().for_each(|n| n.notify_started());
1287
1288 let command_ctx = CommandContext::new(
1289 barrier_info,
1290 mv_subscription_max_retention,
1291 table_ids_to_commit,
1292 command,
1293 span,
1294 );
1295
1296 self.enqueue_command(command_ctx, notifiers, jobs_to_wait);
1298
1299 Ok(())
1300 }
1301}