1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::ddl_service::DdlProgress;
29use risingwave_pb::hummock::HummockVersionStats;
30use risingwave_pb::stream_service::BarrierCompleteResponse;
31use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
32use tracing::{debug, warn};
33
34use crate::barrier::cdc_progress::CdcProgress;
35use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
36use crate::barrier::checkpoint::recovery::{
37 DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
38 RecoveringStateAction,
39};
40use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
41use crate::barrier::command::CommandContext;
42use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
43use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
44use crate::barrier::notifier::Notifier;
45use crate::barrier::progress::TrackingJob;
46use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
47use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
48use crate::barrier::utils::{
49 NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
50};
51use crate::barrier::{Command, CreateStreamingJobType};
52use crate::manager::MetaSrvEnv;
53use crate::rpc::metrics::GLOBAL_META_METRICS;
54use crate::{MetaError, MetaResult};
55
56pub(crate) struct CheckpointControl {
57 pub(crate) env: MetaSrvEnv,
58 pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
59 pub(super) hummock_version_stats: HummockVersionStats,
60 pub(crate) in_flight_barrier_nums: usize,
62}
63
64impl CheckpointControl {
65 pub fn new(env: MetaSrvEnv) -> Self {
66 Self {
67 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
68 env,
69 databases: Default::default(),
70 hummock_version_stats: Default::default(),
71 }
72 }
73
74 pub(crate) fn recover(
75 databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
76 failed_databases: HashSet<DatabaseId>,
77 control_stream_manager: &mut ControlStreamManager,
78 hummock_version_stats: HummockVersionStats,
79 env: MetaSrvEnv,
80 ) -> Self {
81 env.shared_actor_infos()
82 .retain_databases(databases.keys().chain(&failed_databases).cloned());
83 Self {
84 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
85 env,
86 databases: databases
87 .into_iter()
88 .map(|(database_id, control)| {
89 (
90 database_id,
91 DatabaseCheckpointControlStatus::Running(control),
92 )
93 })
94 .chain(failed_databases.into_iter().map(|database_id| {
95 (
96 database_id,
97 DatabaseCheckpointControlStatus::Recovering(
98 DatabaseRecoveringState::resetting(database_id, control_stream_manager),
99 ),
100 )
101 }))
102 .collect(),
103 hummock_version_stats,
104 }
105 }
106
107 pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
108 self.hummock_version_stats = output.hummock_version_stats;
109 for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
110 self.databases
111 .get_mut(&database_id)
112 .expect("should exist")
113 .expect_running("should have wait for completing command before enter recovery")
114 .ack_completed(command_prev_epoch, creating_job_epochs);
115 }
116 }
117
118 pub(crate) fn next_complete_barrier_task(
119 &mut self,
120 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
121 ) -> Option<CompleteBarrierTask> {
122 let mut task = None;
123 for database in self.databases.values_mut() {
124 let Some(database) = database.running_state_mut() else {
125 continue;
126 };
127 let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
128 database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
129 }
130 task
131 }
132
133 pub(crate) fn barrier_collected(
134 &mut self,
135 resp: BarrierCompleteResponse,
136 periodic_barriers: &mut PeriodicBarriers,
137 ) -> MetaResult<()> {
138 let database_id = resp.database_id;
139 let database_status = self.databases.get_mut(&database_id).expect("should exist");
140 match database_status {
141 DatabaseCheckpointControlStatus::Running(database) => {
142 database.barrier_collected(resp, periodic_barriers)
143 }
144 DatabaseCheckpointControlStatus::Recovering(state) => {
145 state.barrier_collected(database_id, resp);
146 Ok(())
147 }
148 }
149 }
150
151 pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
152 self.databases.iter().filter_map(|(database_id, database)| {
153 database.running_state().is_none().then_some(*database_id)
154 })
155 }
156
157 pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
158 self.databases.iter().filter_map(|(database_id, database)| {
159 database.running_state().is_some().then_some(*database_id)
160 })
161 }
162
163 pub(crate) fn handle_new_barrier(
165 &mut self,
166 new_barrier: NewBarrier,
167 control_stream_manager: &mut ControlStreamManager,
168 ) -> MetaResult<()> {
169 let NewBarrier {
170 database_id,
171 command,
172 span,
173 checkpoint,
174 } = new_barrier;
175
176 if let Some((mut command, notifiers)) = command {
177 if let &mut Command::CreateStreamingJob {
178 ref mut cross_db_snapshot_backfill_info,
179 ref info,
180 ..
181 } = &mut command
182 {
183 for (table_id, snapshot_epoch) in
184 &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
185 {
186 for database in self.databases.values() {
187 if let Some(database) = database.running_state()
188 && database.database_info.contains_job(table_id.as_job_id())
189 {
190 if let Some(committed_epoch) = database.committed_epoch {
191 *snapshot_epoch = Some(committed_epoch);
192 }
193 break;
194 }
195 }
196 if snapshot_epoch.is_none() {
197 let table_id = *table_id;
198 warn!(
199 ?cross_db_snapshot_backfill_info,
200 ?table_id,
201 ?info,
202 "database of cross db upstream table not found"
203 );
204 let err: MetaError =
205 anyhow!("database of cross db upstream table {} not found", table_id)
206 .into();
207 for notifier in notifiers {
208 notifier.notify_start_failed(err.clone());
209 }
210
211 return Ok(());
212 }
213 }
214 }
215
216 let database = match self.databases.entry(database_id) {
217 Entry::Occupied(entry) => entry
218 .into_mut()
219 .expect_running("should not have command when not running"),
220 Entry::Vacant(entry) => match &command {
221 Command::CreateStreamingJob {
222 job_type: CreateStreamingJobType::Normal,
223 ..
224 } => {
225 let new_database = DatabaseCheckpointControl::new(
226 database_id,
227 self.env.shared_actor_infos().clone(),
228 );
229 control_stream_manager.add_partial_graph(database_id, None);
230 entry
231 .insert(DatabaseCheckpointControlStatus::Running(new_database))
232 .expect_running("just initialized as running")
233 }
234 Command::Flush | Command::Pause | Command::Resume => {
235 for mut notifier in notifiers {
236 notifier.notify_started();
237 notifier.notify_collected();
238 }
239 warn!(?command, "skip command for empty database");
240 return Ok(());
241 }
242 _ => {
243 panic!(
244 "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
245 database_id, command
246 )
247 }
248 },
249 };
250
251 database.handle_new_barrier(
252 Some((command, notifiers)),
253 checkpoint,
254 span,
255 control_stream_manager,
256 &self.hummock_version_stats,
257 )
258 } else {
259 let database = match self.databases.entry(database_id) {
260 Entry::Occupied(entry) => entry.into_mut(),
261 Entry::Vacant(_) => {
262 return Ok(());
265 }
266 };
267 let Some(database) = database.running_state_mut() else {
268 return Ok(());
270 };
271 if !database.can_inject_barrier(self.in_flight_barrier_nums) {
272 return Ok(());
274 }
275 database.handle_new_barrier(
276 None,
277 checkpoint,
278 span,
279 control_stream_manager,
280 &self.hummock_version_stats,
281 )
282 }
283 }
284
285 pub(crate) fn update_barrier_nums_metrics(&self) {
286 self.databases
287 .values()
288 .flat_map(|database| database.running_state())
289 .for_each(|database| database.update_barrier_nums_metrics());
290 }
291
292 pub(crate) fn gen_ddl_progress(&self) -> HashMap<JobId, DdlProgress> {
293 let mut progress = HashMap::new();
294 for status in self.databases.values() {
295 let Some(database_checkpoint_control) = status.running_state() else {
296 continue;
297 };
298 progress.extend(database_checkpoint_control.database_info.gen_ddl_progress());
300 for creating_job in database_checkpoint_control
302 .creating_streaming_job_controls
303 .values()
304 {
305 progress.extend([(creating_job.job_id, creating_job.gen_ddl_progress())]);
306 }
307 }
308 progress
309 }
310
311 pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
312 let mut progress = HashMap::new();
313 for status in self.databases.values() {
314 let Some(database_checkpoint_control) = status.running_state() else {
315 continue;
316 };
317 progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
319 }
320 progress
321 }
322
323 pub(crate) fn databases_failed_at_worker_err(
324 &mut self,
325 worker_id: WorkerId,
326 ) -> Vec<DatabaseId> {
327 let mut failed_databases = Vec::new();
328 for (database_id, database_status) in &mut self.databases {
329 let database_checkpoint_control = match database_status {
330 DatabaseCheckpointControlStatus::Running(control) => control,
331 DatabaseCheckpointControlStatus::Recovering(state) => {
332 if !state.is_valid_after_worker_err(worker_id) {
333 failed_databases.push(*database_id);
334 }
335 continue;
336 }
337 };
338
339 if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
340 || database_checkpoint_control
341 .database_info
342 .contains_worker(worker_id as _)
343 || database_checkpoint_control
344 .creating_streaming_job_controls
345 .values_mut()
346 .any(|job| !job.is_valid_after_worker_err(worker_id))
347 {
348 failed_databases.push(*database_id);
349 }
350 }
351 failed_databases
352 }
353
354 pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
355 for (_, node) in self.databases.values_mut().flat_map(|status| {
356 status
357 .running_state_mut()
358 .map(|database| take(&mut database.command_ctx_queue))
359 .into_iter()
360 .flatten()
361 }) {
362 for notifier in node.notifiers {
363 notifier.notify_collection_failed(err.clone());
364 }
365 node.enqueue_time.observe_duration();
366 }
367 }
368
369 pub(crate) fn inflight_infos(
370 &self,
371 ) -> impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId> + '_)> + '_ {
372 self.databases.iter().flat_map(|(database_id, database)| {
373 database.database_state().map(|(_, creating_jobs)| {
374 (
375 *database_id,
376 creating_jobs
377 .values()
378 .filter_map(|job| job.is_consuming().then_some(job.job_id)),
379 )
380 })
381 })
382 }
383}
384
385pub(crate) enum CheckpointControlEvent<'a> {
386 EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
387 EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
388}
389
390impl CheckpointControl {
391 pub(crate) fn on_reset_database_resp(
392 &mut self,
393 worker_id: WorkerId,
394 resp: ResetDatabaseResponse,
395 ) {
396 let database_id = resp.database_id;
397 match self.databases.get_mut(&database_id).expect("should exist") {
398 DatabaseCheckpointControlStatus::Running(_) => {
399 unreachable!("should not receive reset database resp when running")
400 }
401 DatabaseCheckpointControlStatus::Recovering(state) => {
402 state.on_reset_database_resp(worker_id, resp)
403 }
404 }
405 }
406
407 pub(crate) fn next_event(
408 &mut self,
409 ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
410 let mut this = Some(self);
411 poll_fn(move |cx| {
412 let Some(this_mut) = this.as_mut() else {
413 unreachable!("should not be polled after poll ready")
414 };
415 for (&database_id, database_status) in &mut this_mut.databases {
416 match database_status {
417 DatabaseCheckpointControlStatus::Running(_) => {}
418 DatabaseCheckpointControlStatus::Recovering(state) => {
419 let poll_result = state.poll_next_event(cx);
420 if let Poll::Ready(action) = poll_result {
421 let this = this.take().expect("checked Some");
422 return Poll::Ready(match action {
423 RecoveringStateAction::EnterInitializing(reset_workers) => {
424 CheckpointControlEvent::EnteringInitializing(
425 this.new_database_status_action(
426 database_id,
427 EnterInitializing(reset_workers),
428 ),
429 )
430 }
431 RecoveringStateAction::EnterRunning => {
432 CheckpointControlEvent::EnteringRunning(
433 this.new_database_status_action(database_id, EnterRunning),
434 )
435 }
436 });
437 }
438 }
439 }
440 }
441 Poll::Pending
442 })
443 }
444}
445
446pub(crate) enum DatabaseCheckpointControlStatus {
447 Running(DatabaseCheckpointControl),
448 Recovering(DatabaseRecoveringState),
449}
450
451impl DatabaseCheckpointControlStatus {
452 fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
453 match self {
454 DatabaseCheckpointControlStatus::Running(state) => Some(state),
455 DatabaseCheckpointControlStatus::Recovering(_) => None,
456 }
457 }
458
459 fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
460 match self {
461 DatabaseCheckpointControlStatus::Running(state) => Some(state),
462 DatabaseCheckpointControlStatus::Recovering(_) => None,
463 }
464 }
465
466 fn database_state(
467 &self,
468 ) -> Option<(
469 &BarrierWorkerState,
470 &HashMap<JobId, CreatingStreamingJobControl>,
471 )> {
472 match self {
473 DatabaseCheckpointControlStatus::Running(control) => {
474 Some((&control.state, &control.creating_streaming_job_controls))
475 }
476 DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
477 }
478 }
479
480 fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
481 match self {
482 DatabaseCheckpointControlStatus::Running(state) => state,
483 DatabaseCheckpointControlStatus::Recovering(_) => {
484 panic!("should be at running: {}", reason)
485 }
486 }
487 }
488}
489
490struct DatabaseCheckpointControlMetrics {
491 barrier_latency: LabelGuardedHistogram,
492 in_flight_barrier_nums: LabelGuardedIntGauge,
493 all_barrier_nums: LabelGuardedIntGauge,
494}
495
496impl DatabaseCheckpointControlMetrics {
497 fn new(database_id: DatabaseId) -> Self {
498 let database_id_str = database_id.to_string();
499 let barrier_latency = GLOBAL_META_METRICS
500 .barrier_latency
501 .with_guarded_label_values(&[&database_id_str]);
502 let in_flight_barrier_nums = GLOBAL_META_METRICS
503 .in_flight_barrier_nums
504 .with_guarded_label_values(&[&database_id_str]);
505 let all_barrier_nums = GLOBAL_META_METRICS
506 .all_barrier_nums
507 .with_guarded_label_values(&[&database_id_str]);
508 Self {
509 barrier_latency,
510 in_flight_barrier_nums,
511 all_barrier_nums,
512 }
513 }
514}
515
516pub(crate) struct DatabaseCheckpointControl {
518 pub(super) database_id: DatabaseId,
519 pub(super) state: BarrierWorkerState,
520
521 command_ctx_queue: BTreeMap<u64, EpochNode>,
524 completing_barrier: Option<u64>,
527
528 committed_epoch: Option<u64>,
529
530 pub(super) database_info: InflightDatabaseInfo,
531 pub(super) creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
532
533 metrics: DatabaseCheckpointControlMetrics,
534}
535
536impl DatabaseCheckpointControl {
537 fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
538 Self {
539 database_id,
540 state: BarrierWorkerState::new(),
541 command_ctx_queue: Default::default(),
542 completing_barrier: None,
543 committed_epoch: None,
544 database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
545 creating_streaming_job_controls: Default::default(),
546 metrics: DatabaseCheckpointControlMetrics::new(database_id),
547 }
548 }
549
550 pub(crate) fn recovery(
551 database_id: DatabaseId,
552 state: BarrierWorkerState,
553 committed_epoch: u64,
554 database_info: InflightDatabaseInfo,
555 creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
556 ) -> Self {
557 Self {
558 database_id,
559 state,
560 command_ctx_queue: Default::default(),
561 completing_barrier: None,
562 committed_epoch: Some(committed_epoch),
563 database_info,
564 creating_streaming_job_controls,
565 metrics: DatabaseCheckpointControlMetrics::new(database_id),
566 }
567 }
568
569 fn total_command_num(&self) -> usize {
570 self.command_ctx_queue.len()
571 + match &self.completing_barrier {
572 Some(_) => 1,
573 None => 0,
574 }
575 }
576
577 fn update_barrier_nums_metrics(&self) {
579 self.metrics.in_flight_barrier_nums.set(
580 self.command_ctx_queue
581 .values()
582 .filter(|x| x.state.is_inflight())
583 .count() as i64,
584 );
585 self.metrics
586 .all_barrier_nums
587 .set(self.total_command_num() as i64);
588 }
589
590 fn enqueue_command(
592 &mut self,
593 command_ctx: CommandContext,
594 notifiers: Vec<Notifier>,
595 node_to_collect: NodeToCollect,
596 creating_jobs_to_wait: HashSet<JobId>,
597 ) {
598 let timer = self.metrics.barrier_latency.start_timer();
599
600 if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
601 assert_eq!(
602 command_ctx.barrier_info.prev_epoch.value(),
603 node.command_ctx.barrier_info.curr_epoch.value()
604 );
605 }
606
607 tracing::trace!(
608 prev_epoch = command_ctx.barrier_info.prev_epoch(),
609 ?creating_jobs_to_wait,
610 ?node_to_collect,
611 "enqueue command"
612 );
613 self.command_ctx_queue.insert(
614 command_ctx.barrier_info.prev_epoch(),
615 EpochNode {
616 enqueue_time: timer,
617 state: BarrierEpochState {
618 node_to_collect,
619 resps: vec![],
620 creating_jobs_to_wait,
621 finished_jobs: HashMap::new(),
622 },
623 command_ctx,
624 notifiers,
625 },
626 );
627 }
628
629 fn barrier_collected(
632 &mut self,
633 resp: BarrierCompleteResponse,
634 periodic_barriers: &mut PeriodicBarriers,
635 ) -> MetaResult<()> {
636 let worker_id = resp.worker_id;
637 let prev_epoch = resp.epoch;
638 tracing::trace!(
639 %worker_id,
640 prev_epoch,
641 partial_graph_id = resp.partial_graph_id,
642 "barrier collected"
643 );
644 let creating_job_id = from_partial_graph_id(resp.partial_graph_id);
645 match creating_job_id {
646 None => {
647 if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
648 assert!(node.state.node_to_collect.remove(&worker_id).is_some());
649 node.state.resps.push(resp);
650 } else {
651 panic!(
652 "collect barrier on non-existing barrier: {}, {}",
653 prev_epoch, worker_id
654 );
655 }
656 }
657 Some(creating_job_id) => {
658 let should_merge_to_upstream = self
659 .creating_streaming_job_controls
660 .get_mut(&creating_job_id)
661 .expect("should exist")
662 .collect(resp);
663 if should_merge_to_upstream {
664 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
665 }
666 }
667 }
668 Ok(())
669 }
670
671 fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
673 self.command_ctx_queue
674 .values()
675 .filter(|x| x.state.is_inflight())
676 .count()
677 < in_flight_barrier_nums
678 }
679
680 pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
682 for epoch_node in self.command_ctx_queue.values_mut() {
683 if !is_valid_after_worker_err(&mut epoch_node.state.node_to_collect, worker_id) {
684 return false;
685 }
686 }
687 true
689 }
690}
691
692impl DatabaseCheckpointControl {
693 fn collect_backfill_pinned_upstream_log_epoch(
695 &self,
696 ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
697 self.creating_streaming_job_controls
698 .iter()
699 .filter_map(|(job_id, creating_job)| {
700 creating_job
701 .pinned_upstream_log_epoch()
702 .map(|progress_epoch| {
703 (
704 *job_id,
705 (
706 progress_epoch,
707 creating_job.snapshot_backfill_upstream_tables.clone(),
708 ),
709 )
710 })
711 })
712 .collect()
713 }
714
715 fn next_complete_barrier_task(
716 &mut self,
717 task: &mut Option<CompleteBarrierTask>,
718 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
719 hummock_version_stats: &HummockVersionStats,
720 ) {
721 let mut creating_jobs_task = vec![];
723 {
724 let mut finished_jobs = Vec::new();
726 let min_upstream_inflight_barrier = self
727 .command_ctx_queue
728 .first_key_value()
729 .map(|(epoch, _)| *epoch);
730 for (job_id, job) in &mut self.creating_streaming_job_controls {
731 if let Some((epoch, resps, status)) =
732 job.start_completing(min_upstream_inflight_barrier)
733 {
734 let is_first_time = match status {
735 CompleteJobType::First => true,
736 CompleteJobType::Normal => false,
737 CompleteJobType::Finished => {
738 finished_jobs.push((*job_id, epoch, resps));
739 continue;
740 }
741 };
742 creating_jobs_task.push((*job_id, epoch, resps, is_first_time));
743 }
744 }
745 if !finished_jobs.is_empty()
746 && let Some((_, control_stream_manager)) = &mut context
747 {
748 control_stream_manager.remove_partial_graph(
749 self.database_id,
750 finished_jobs.iter().map(|(job_id, _, _)| *job_id).collect(),
751 );
752 }
753 for (job_id, epoch, resps) in finished_jobs {
754 let epoch_state = &mut self
755 .command_ctx_queue
756 .get_mut(&epoch)
757 .expect("should exist")
758 .state;
759 assert!(epoch_state.creating_jobs_to_wait.remove(&job_id));
760 debug!(epoch, %job_id, "finish creating job");
761 let creating_streaming_job = self
764 .creating_streaming_job_controls
765 .remove(&job_id)
766 .expect("should exist");
767 let tracking_job = creating_streaming_job.into_tracking_job();
768
769 assert!(
770 epoch_state
771 .finished_jobs
772 .insert(job_id, (resps, tracking_job))
773 .is_none()
774 );
775 }
776 }
777 assert!(self.completing_barrier.is_none());
778 while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
779 && !state.is_inflight()
780 {
781 {
782 let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
783 assert!(node.state.creating_jobs_to_wait.is_empty());
784 assert!(node.state.node_to_collect.is_empty());
785
786 self.handle_refresh_table_info(task, &node);
787
788 self.database_info.apply_collected_command(
789 node.command_ctx.command.as_ref(),
790 &node.state.resps,
791 hummock_version_stats,
792 );
793 if !node.command_ctx.barrier_info.kind.is_checkpoint() {
794 node.notifiers.into_iter().for_each(|notifier| {
795 notifier.notify_collected();
796 });
797 if let Some((periodic_barriers, _)) = &mut context
798 && self.database_info.has_pending_finished_jobs()
799 && self
800 .command_ctx_queue
801 .values()
802 .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
803 {
804 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
805 }
806 continue;
807 }
808 let mut staging_commit_info = self.database_info.take_staging_commit_info();
809 node.state
810 .finished_jobs
811 .drain()
812 .for_each(|(_job_id, (resps, tracking_job))| {
813 node.state.resps.extend(resps);
814 staging_commit_info.finished_jobs.push(tracking_job);
815 });
816 let task = task.get_or_insert_default();
817 node.command_ctx.collect_commit_epoch_info(
818 &mut task.commit_info,
819 take(&mut node.state.resps),
820 self.collect_backfill_pinned_upstream_log_epoch(),
821 );
822 self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
823 task.finished_jobs.extend(staging_commit_info.finished_jobs);
824 task.finished_cdc_table_backfill
825 .extend(staging_commit_info.finished_cdc_table_backfill);
826 task.notifiers.extend(node.notifiers);
827 task.epoch_infos
828 .try_insert(
829 self.database_id,
830 (Some((node.command_ctx, node.enqueue_time)), vec![]),
831 )
832 .expect("non duplicate");
833 task.commit_info
834 .truncate_tables
835 .extend(staging_commit_info.table_ids_to_truncate);
836 break;
837 }
838 }
839 if !creating_jobs_task.is_empty() {
840 let task = task.get_or_insert_default();
841 for (job_id, epoch, resps, is_first_time) in creating_jobs_task {
842 collect_creating_job_commit_epoch_info(
843 &mut task.commit_info,
844 epoch,
845 resps,
846 self.creating_streaming_job_controls[&job_id]
847 .state_table_ids()
848 .iter()
849 .copied(),
850 is_first_time,
851 );
852 let (_, creating_job_epochs) =
853 task.epoch_infos.entry(self.database_id).or_default();
854 creating_job_epochs.push((job_id, epoch));
855 }
856 }
857 }
858
859 fn ack_completed(
860 &mut self,
861 command_prev_epoch: Option<u64>,
862 creating_job_epochs: Vec<(JobId, u64)>,
863 ) {
864 {
865 if let Some(prev_epoch) = self.completing_barrier.take() {
866 assert_eq!(command_prev_epoch, Some(prev_epoch));
867 self.committed_epoch = Some(prev_epoch);
868 } else {
869 assert_eq!(command_prev_epoch, None);
870 };
871 for (job_id, epoch) in creating_job_epochs {
872 self.creating_streaming_job_controls
873 .get_mut(&job_id)
874 .expect("should exist")
875 .ack_completed(epoch)
876 }
877 }
878 }
879
880 fn handle_refresh_table_info(&self, task: &mut Option<CompleteBarrierTask>, node: &EpochNode) {
881 let list_finished_info = node
882 .state
883 .resps
884 .iter()
885 .flat_map(|resp| resp.list_finished_sources.clone())
886 .collect::<Vec<_>>();
887 if !list_finished_info.is_empty() {
888 let task = task.get_or_insert_default();
889 task.list_finished_source_ids.extend(list_finished_info);
890 }
891
892 let load_finished_info = node
893 .state
894 .resps
895 .iter()
896 .flat_map(|resp| resp.load_finished_sources.clone())
897 .collect::<Vec<_>>();
898 if !load_finished_info.is_empty() {
899 let task = task.get_or_insert_default();
900 task.load_finished_source_ids.extend(load_finished_info);
901 }
902
903 let refresh_finished_table_ids: Vec<JobId> = node
904 .state
905 .resps
906 .iter()
907 .flat_map(|resp| {
908 resp.refresh_finished_tables
909 .iter()
910 .map(|table_id| table_id.as_job_id())
911 })
912 .collect::<Vec<_>>();
913 if !refresh_finished_table_ids.is_empty() {
914 let task = task.get_or_insert_default();
915 task.refresh_finished_table_job_ids
916 .extend(refresh_finished_table_ids);
917 }
918 }
919}
920
921struct EpochNode {
923 enqueue_time: HistogramTimer,
925
926 state: BarrierEpochState,
928
929 command_ctx: CommandContext,
931 notifiers: Vec<Notifier>,
933}
934
935#[derive(Debug)]
936struct BarrierEpochState {
938 node_to_collect: NodeToCollect,
939
940 resps: Vec<BarrierCompleteResponse>,
941
942 creating_jobs_to_wait: HashSet<JobId>,
943
944 finished_jobs: HashMap<JobId, (Vec<BarrierCompleteResponse>, TrackingJob)>,
945}
946
947impl BarrierEpochState {
948 fn is_inflight(&self) -> bool {
949 !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
950 }
951}
952
953impl DatabaseCheckpointControl {
954 fn handle_new_barrier(
956 &mut self,
957 command: Option<(Command, Vec<Notifier>)>,
958 checkpoint: bool,
959 span: tracing::Span,
960 control_stream_manager: &mut ControlStreamManager,
961 hummock_version_stats: &HummockVersionStats,
962 ) -> MetaResult<()> {
963 let curr_epoch = self.state.in_flight_prev_epoch().next();
964
965 let (command, mut notifiers) = if let Some((command, notifiers)) = command {
966 (Some(command), notifiers)
967 } else {
968 (None, vec![])
969 };
970
971 for job_to_cancel in command
972 .as_ref()
973 .map(Command::jobs_to_drop)
974 .into_iter()
975 .flatten()
976 {
977 if self
978 .creating_streaming_job_controls
979 .contains_key(&job_to_cancel)
980 {
981 warn!(
982 job_id = %job_to_cancel,
983 "ignore cancel command on creating streaming job"
984 );
985 for notifier in notifiers {
986 notifier
987 .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
988 }
989 return Ok(());
990 }
991 }
992
993 if let Some(Command::RescheduleFragment { .. }) = &command
994 && !self.creating_streaming_job_controls.is_empty()
995 {
996 warn!("ignore reschedule when creating streaming job with snapshot backfill");
997 for notifier in notifiers {
998 notifier.notify_start_failed(
999 anyhow!(
1000 "cannot reschedule when creating streaming job with snapshot backfill",
1001 )
1002 .into(),
1003 );
1004 }
1005 return Ok(());
1006 }
1007
1008 if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1009 && self.database_info.is_empty()
1010 {
1011 assert!(
1012 self.creating_streaming_job_controls.is_empty(),
1013 "should not have snapshot backfill job when there is no normal job in database"
1014 );
1015 for mut notifier in notifiers {
1017 notifier.notify_started();
1018 notifier.notify_collected();
1019 }
1020 return Ok(());
1021 };
1022
1023 if let Some(Command::CreateStreamingJob {
1024 job_type: CreateStreamingJobType::SnapshotBackfill(_),
1025 ..
1026 }) = &command
1027 && self.state.is_paused()
1028 {
1029 warn!("cannot create streaming job with snapshot backfill when paused");
1030 for notifier in notifiers {
1031 notifier.notify_start_failed(
1032 anyhow!("cannot create streaming job with snapshot backfill when paused",)
1033 .into(),
1034 );
1035 }
1036 return Ok(());
1037 }
1038
1039 let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1040 barrier_info.prev_epoch.span().in_scope(|| {
1042 tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch.value().0, "new barrier enqueued");
1043 });
1044 span.record("epoch", barrier_info.curr_epoch.value().0);
1045
1046 let ApplyCommandInfo {
1047 mv_subscription_max_retention,
1048 table_ids_to_commit,
1049 jobs_to_wait,
1050 node_to_collect,
1051 command,
1052 } = match self.apply_command(
1053 command,
1054 &barrier_info,
1055 control_stream_manager,
1056 hummock_version_stats,
1057 ) {
1058 Ok(info) => info,
1059 Err(err) => {
1060 for notifier in notifiers {
1061 notifier.notify_start_failed(err.clone());
1062 }
1063 fail_point!("inject_barrier_err_success");
1064 return Err(err);
1065 }
1066 };
1067
1068 notifiers.iter_mut().for_each(|n| n.notify_started());
1070
1071 let command_ctx = CommandContext::new(
1072 barrier_info,
1073 mv_subscription_max_retention,
1074 table_ids_to_commit,
1075 command,
1076 span,
1077 );
1078
1079 self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1081
1082 Ok(())
1083 }
1084}