1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::ddl_service::DdlProgress;
29use risingwave_pb::hummock::HummockVersionStats;
30use risingwave_pb::stream_service::BarrierCompleteResponse;
31use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
32use tracing::{debug, warn};
33
34use crate::barrier::cdc_progress::CdcProgress;
35use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
36use crate::barrier::checkpoint::recovery::{
37 DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
38 RecoveringStateAction,
39};
40use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
41use crate::barrier::command::CommandContext;
42use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
43use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
44use crate::barrier::notifier::Notifier;
45use crate::barrier::progress::TrackingJob;
46use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
47use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
48use crate::barrier::utils::{
49 NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
50};
51use crate::barrier::{Command, CreateStreamingJobType};
52use crate::manager::MetaSrvEnv;
53use crate::rpc::metrics::GLOBAL_META_METRICS;
54use crate::{MetaError, MetaResult};
55
56pub(crate) struct CheckpointControl {
57 pub(crate) env: MetaSrvEnv,
58 pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
59 pub(super) hummock_version_stats: HummockVersionStats,
60 pub(crate) in_flight_barrier_nums: usize,
62}
63
64impl CheckpointControl {
65 pub fn new(env: MetaSrvEnv) -> Self {
66 Self {
67 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
68 env,
69 databases: Default::default(),
70 hummock_version_stats: Default::default(),
71 }
72 }
73
74 pub(crate) fn recover(
75 databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
76 failed_databases: HashSet<DatabaseId>,
77 control_stream_manager: &mut ControlStreamManager,
78 hummock_version_stats: HummockVersionStats,
79 env: MetaSrvEnv,
80 ) -> Self {
81 env.shared_actor_infos()
82 .retain_databases(databases.keys().chain(&failed_databases).cloned());
83 Self {
84 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
85 env,
86 databases: databases
87 .into_iter()
88 .map(|(database_id, control)| {
89 (
90 database_id,
91 DatabaseCheckpointControlStatus::Running(control),
92 )
93 })
94 .chain(failed_databases.into_iter().map(|database_id| {
95 (
96 database_id,
97 DatabaseCheckpointControlStatus::Recovering(
98 DatabaseRecoveringState::resetting(database_id, control_stream_manager),
99 ),
100 )
101 }))
102 .collect(),
103 hummock_version_stats,
104 }
105 }
106
107 pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
108 self.hummock_version_stats = output.hummock_version_stats;
109 for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
110 self.databases
111 .get_mut(&database_id)
112 .expect("should exist")
113 .expect_running("should have wait for completing command before enter recovery")
114 .ack_completed(command_prev_epoch, creating_job_epochs);
115 }
116 }
117
118 pub(crate) fn next_complete_barrier_task(
119 &mut self,
120 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
121 ) -> Option<CompleteBarrierTask> {
122 let mut task = None;
123 for database in self.databases.values_mut() {
124 let Some(database) = database.running_state_mut() else {
125 continue;
126 };
127 let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
128 database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
129 }
130 task
131 }
132
133 pub(crate) fn barrier_collected(
134 &mut self,
135 resp: BarrierCompleteResponse,
136 periodic_barriers: &mut PeriodicBarriers,
137 ) -> MetaResult<()> {
138 let database_id = resp.database_id;
139 let database_status = self.databases.get_mut(&database_id).expect("should exist");
140 match database_status {
141 DatabaseCheckpointControlStatus::Running(database) => {
142 database.barrier_collected(resp, periodic_barriers)
143 }
144 DatabaseCheckpointControlStatus::Recovering(state) => {
145 state.barrier_collected(database_id, resp);
146 Ok(())
147 }
148 }
149 }
150
151 pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
152 self.databases.iter().filter_map(|(database_id, database)| {
153 database.running_state().is_none().then_some(*database_id)
154 })
155 }
156
157 pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
158 self.databases.iter().filter_map(|(database_id, database)| {
159 database.running_state().is_some().then_some(*database_id)
160 })
161 }
162
163 pub(crate) fn handle_new_barrier(
165 &mut self,
166 new_barrier: NewBarrier,
167 control_stream_manager: &mut ControlStreamManager,
168 ) -> MetaResult<()> {
169 let NewBarrier {
170 database_id,
171 command,
172 span,
173 checkpoint,
174 } = new_barrier;
175
176 if let Some((mut command, notifiers)) = command {
177 if let &mut Command::CreateStreamingJob {
178 ref mut cross_db_snapshot_backfill_info,
179 ref info,
180 ..
181 } = &mut command
182 {
183 for (table_id, snapshot_epoch) in
184 &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
185 {
186 for database in self.databases.values() {
187 if let Some(database) = database.running_state()
188 && database.database_info.contains_job(table_id.as_job_id())
189 {
190 if let Some(committed_epoch) = database.committed_epoch {
191 *snapshot_epoch = Some(committed_epoch);
192 }
193 break;
194 }
195 }
196 if snapshot_epoch.is_none() {
197 let table_id = *table_id;
198 warn!(
199 ?cross_db_snapshot_backfill_info,
200 ?table_id,
201 ?info,
202 "database of cross db upstream table not found"
203 );
204 let err: MetaError =
205 anyhow!("database of cross db upstream table {} not found", table_id)
206 .into();
207 for notifier in notifiers {
208 notifier.notify_start_failed(err.clone());
209 }
210
211 return Ok(());
212 }
213 }
214 }
215
216 let database = match self.databases.entry(database_id) {
217 Entry::Occupied(entry) => entry
218 .into_mut()
219 .expect_running("should not have command when not running"),
220 Entry::Vacant(entry) => match &command {
221 Command::CreateStreamingJob {
222 job_type: CreateStreamingJobType::Normal,
223 ..
224 } => {
225 let new_database = DatabaseCheckpointControl::new(
226 database_id,
227 self.env.shared_actor_infos().clone(),
228 );
229 control_stream_manager.add_partial_graph(database_id, None);
230 entry
231 .insert(DatabaseCheckpointControlStatus::Running(new_database))
232 .expect_running("just initialized as running")
233 }
234 Command::Flush | Command::Pause | Command::Resume => {
235 for mut notifier in notifiers {
236 notifier.notify_started();
237 notifier.notify_collected();
238 }
239 warn!(?command, "skip command for empty database");
240 return Ok(());
241 }
242 _ => {
243 panic!(
244 "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
245 database_id, command
246 )
247 }
248 },
249 };
250
251 database.handle_new_barrier(
252 Some((command, notifiers)),
253 checkpoint,
254 span,
255 control_stream_manager,
256 &self.hummock_version_stats,
257 )
258 } else {
259 let database = match self.databases.entry(database_id) {
260 Entry::Occupied(entry) => entry.into_mut(),
261 Entry::Vacant(_) => {
262 return Ok(());
265 }
266 };
267 let Some(database) = database.running_state_mut() else {
268 return Ok(());
270 };
271 if !database.can_inject_barrier(self.in_flight_barrier_nums) {
272 return Ok(());
274 }
275 database.handle_new_barrier(
276 None,
277 checkpoint,
278 span,
279 control_stream_manager,
280 &self.hummock_version_stats,
281 )
282 }
283 }
284
285 pub(crate) fn update_barrier_nums_metrics(&self) {
286 self.databases
287 .values()
288 .flat_map(|database| database.running_state())
289 .for_each(|database| database.update_barrier_nums_metrics());
290 }
291
292 pub(crate) fn gen_ddl_progress(&self) -> HashMap<JobId, DdlProgress> {
293 let mut progress = HashMap::new();
294 for status in self.databases.values() {
295 let Some(database_checkpoint_control) = status.running_state() else {
296 continue;
297 };
298 progress.extend(database_checkpoint_control.database_info.gen_ddl_progress());
300 for creating_job in database_checkpoint_control
302 .creating_streaming_job_controls
303 .values()
304 {
305 progress.extend([(creating_job.job_id, creating_job.gen_ddl_progress())]);
306 }
307 }
308 progress
309 }
310
311 pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
312 let mut progress = HashMap::new();
313 for status in self.databases.values() {
314 let Some(database_checkpoint_control) = status.running_state() else {
315 continue;
316 };
317 progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
319 }
320 progress
321 }
322
323 pub(crate) fn databases_failed_at_worker_err(
324 &mut self,
325 worker_id: WorkerId,
326 ) -> Vec<DatabaseId> {
327 let mut failed_databases = Vec::new();
328 for (database_id, database_status) in &mut self.databases {
329 let database_checkpoint_control = match database_status {
330 DatabaseCheckpointControlStatus::Running(control) => control,
331 DatabaseCheckpointControlStatus::Recovering(state) => {
332 if !state.is_valid_after_worker_err(worker_id) {
333 failed_databases.push(*database_id);
334 }
335 continue;
336 }
337 };
338
339 if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
340 || database_checkpoint_control
341 .database_info
342 .contains_worker(worker_id as _)
343 || database_checkpoint_control
344 .creating_streaming_job_controls
345 .values_mut()
346 .any(|job| !job.is_valid_after_worker_err(worker_id))
347 {
348 failed_databases.push(*database_id);
349 }
350 }
351 failed_databases
352 }
353
354 pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
355 for (_, node) in self.databases.values_mut().flat_map(|status| {
356 status
357 .running_state_mut()
358 .map(|database| take(&mut database.command_ctx_queue))
359 .into_iter()
360 .flatten()
361 }) {
362 for notifier in node.notifiers {
363 notifier.notify_collection_failed(err.clone());
364 }
365 node.enqueue_time.observe_duration();
366 }
367 }
368
369 pub(crate) fn inflight_infos(
370 &self,
371 ) -> impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId> + '_)> + '_ {
372 self.databases.iter().flat_map(|(database_id, database)| {
373 database.database_state().map(|(_, creating_jobs)| {
374 (
375 *database_id,
376 creating_jobs
377 .values()
378 .filter_map(|job| job.is_consuming().then_some(job.job_id)),
379 )
380 })
381 })
382 }
383}
384
385pub(crate) enum CheckpointControlEvent<'a> {
386 EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
387 EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
388}
389
390impl CheckpointControl {
391 pub(crate) fn on_reset_database_resp(
392 &mut self,
393 worker_id: WorkerId,
394 resp: ResetDatabaseResponse,
395 ) {
396 let database_id = resp.database_id;
397 match self.databases.get_mut(&database_id).expect("should exist") {
398 DatabaseCheckpointControlStatus::Running(_) => {
399 unreachable!("should not receive reset database resp when running")
400 }
401 DatabaseCheckpointControlStatus::Recovering(state) => {
402 state.on_reset_database_resp(worker_id, resp)
403 }
404 }
405 }
406
407 pub(crate) fn next_event(
408 &mut self,
409 ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
410 let mut this = Some(self);
411 poll_fn(move |cx| {
412 let Some(this_mut) = this.as_mut() else {
413 unreachable!("should not be polled after poll ready")
414 };
415 for (&database_id, database_status) in &mut this_mut.databases {
416 match database_status {
417 DatabaseCheckpointControlStatus::Running(_) => {}
418 DatabaseCheckpointControlStatus::Recovering(state) => {
419 let poll_result = state.poll_next_event(cx);
420 if let Poll::Ready(action) = poll_result {
421 let this = this.take().expect("checked Some");
422 return Poll::Ready(match action {
423 RecoveringStateAction::EnterInitializing(reset_workers) => {
424 CheckpointControlEvent::EnteringInitializing(
425 this.new_database_status_action(
426 database_id,
427 EnterInitializing(reset_workers),
428 ),
429 )
430 }
431 RecoveringStateAction::EnterRunning => {
432 CheckpointControlEvent::EnteringRunning(
433 this.new_database_status_action(database_id, EnterRunning),
434 )
435 }
436 });
437 }
438 }
439 }
440 }
441 Poll::Pending
442 })
443 }
444}
445
446pub(crate) enum DatabaseCheckpointControlStatus {
447 Running(DatabaseCheckpointControl),
448 Recovering(DatabaseRecoveringState),
449}
450
451impl DatabaseCheckpointControlStatus {
452 fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
453 match self {
454 DatabaseCheckpointControlStatus::Running(state) => Some(state),
455 DatabaseCheckpointControlStatus::Recovering(_) => None,
456 }
457 }
458
459 fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
460 match self {
461 DatabaseCheckpointControlStatus::Running(state) => Some(state),
462 DatabaseCheckpointControlStatus::Recovering(_) => None,
463 }
464 }
465
466 fn database_state(
467 &self,
468 ) -> Option<(
469 &BarrierWorkerState,
470 &HashMap<JobId, CreatingStreamingJobControl>,
471 )> {
472 match self {
473 DatabaseCheckpointControlStatus::Running(control) => {
474 Some((&control.state, &control.creating_streaming_job_controls))
475 }
476 DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
477 }
478 }
479
480 fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
481 match self {
482 DatabaseCheckpointControlStatus::Running(state) => state,
483 DatabaseCheckpointControlStatus::Recovering(_) => {
484 panic!("should be at running: {}", reason)
485 }
486 }
487 }
488}
489
490struct DatabaseCheckpointControlMetrics {
491 barrier_latency: LabelGuardedHistogram,
492 in_flight_barrier_nums: LabelGuardedIntGauge,
493 all_barrier_nums: LabelGuardedIntGauge,
494}
495
496impl DatabaseCheckpointControlMetrics {
497 fn new(database_id: DatabaseId) -> Self {
498 let database_id_str = database_id.to_string();
499 let barrier_latency = GLOBAL_META_METRICS
500 .barrier_latency
501 .with_guarded_label_values(&[&database_id_str]);
502 let in_flight_barrier_nums = GLOBAL_META_METRICS
503 .in_flight_barrier_nums
504 .with_guarded_label_values(&[&database_id_str]);
505 let all_barrier_nums = GLOBAL_META_METRICS
506 .all_barrier_nums
507 .with_guarded_label_values(&[&database_id_str]);
508 Self {
509 barrier_latency,
510 in_flight_barrier_nums,
511 all_barrier_nums,
512 }
513 }
514}
515
516pub(crate) struct DatabaseCheckpointControl {
518 pub(super) database_id: DatabaseId,
519 pub(super) state: BarrierWorkerState,
520
521 command_ctx_queue: BTreeMap<u64, EpochNode>,
524 completing_barrier: Option<u64>,
527
528 committed_epoch: Option<u64>,
529
530 pub(super) database_info: InflightDatabaseInfo,
531 pub(super) creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
532
533 metrics: DatabaseCheckpointControlMetrics,
534}
535
536impl DatabaseCheckpointControl {
537 fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
538 Self {
539 database_id,
540 state: BarrierWorkerState::new(),
541 command_ctx_queue: Default::default(),
542 completing_barrier: None,
543 committed_epoch: None,
544 database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
545 creating_streaming_job_controls: Default::default(),
546 metrics: DatabaseCheckpointControlMetrics::new(database_id),
547 }
548 }
549
550 pub(crate) fn recovery(
551 database_id: DatabaseId,
552 state: BarrierWorkerState,
553 committed_epoch: u64,
554 database_info: InflightDatabaseInfo,
555 creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
556 ) -> Self {
557 Self {
558 database_id,
559 state,
560 command_ctx_queue: Default::default(),
561 completing_barrier: None,
562 committed_epoch: Some(committed_epoch),
563 database_info,
564 creating_streaming_job_controls,
565 metrics: DatabaseCheckpointControlMetrics::new(database_id),
566 }
567 }
568
569 fn total_command_num(&self) -> usize {
570 self.command_ctx_queue.len()
571 + match &self.completing_barrier {
572 Some(_) => 1,
573 None => 0,
574 }
575 }
576
577 fn update_barrier_nums_metrics(&self) {
579 self.metrics.in_flight_barrier_nums.set(
580 self.command_ctx_queue
581 .values()
582 .filter(|x| x.state.is_inflight())
583 .count() as i64,
584 );
585 self.metrics
586 .all_barrier_nums
587 .set(self.total_command_num() as i64);
588 }
589
590 fn enqueue_command(
592 &mut self,
593 command_ctx: CommandContext,
594 notifiers: Vec<Notifier>,
595 node_to_collect: NodeToCollect,
596 creating_jobs_to_wait: HashSet<JobId>,
597 ) {
598 let timer = self.metrics.barrier_latency.start_timer();
599
600 if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
601 assert_eq!(
602 command_ctx.barrier_info.prev_epoch.value(),
603 node.command_ctx.barrier_info.curr_epoch.value()
604 );
605 }
606
607 tracing::trace!(
608 prev_epoch = command_ctx.barrier_info.prev_epoch(),
609 ?creating_jobs_to_wait,
610 ?node_to_collect,
611 "enqueue command"
612 );
613 self.command_ctx_queue.insert(
614 command_ctx.barrier_info.prev_epoch(),
615 EpochNode {
616 enqueue_time: timer,
617 state: BarrierEpochState {
618 node_to_collect,
619 resps: vec![],
620 creating_jobs_to_wait,
621 finished_jobs: HashMap::new(),
622 },
623 command_ctx,
624 notifiers,
625 },
626 );
627 }
628
629 fn barrier_collected(
632 &mut self,
633 resp: BarrierCompleteResponse,
634 periodic_barriers: &mut PeriodicBarriers,
635 ) -> MetaResult<()> {
636 let worker_id = resp.worker_id;
637 let prev_epoch = resp.epoch;
638 tracing::trace!(
639 %worker_id,
640 prev_epoch,
641 partial_graph_id = resp.partial_graph_id,
642 "barrier collected"
643 );
644 let creating_job_id = from_partial_graph_id(resp.partial_graph_id);
645 match creating_job_id {
646 None => {
647 if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
648 assert!(node.state.node_to_collect.remove(&worker_id).is_some());
649 node.state.resps.push(resp);
650 } else {
651 panic!(
652 "collect barrier on non-existing barrier: {}, {}",
653 prev_epoch, worker_id
654 );
655 }
656 }
657 Some(creating_job_id) => {
658 let should_merge_to_upstream = self
659 .creating_streaming_job_controls
660 .get_mut(&creating_job_id)
661 .expect("should exist")
662 .collect(resp);
663 if should_merge_to_upstream {
664 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
665 }
666 }
667 }
668 Ok(())
669 }
670
671 fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
673 self.command_ctx_queue
674 .values()
675 .filter(|x| x.state.is_inflight())
676 .count()
677 < in_flight_barrier_nums
678 }
679
680 pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
682 for epoch_node in self.command_ctx_queue.values_mut() {
683 if !is_valid_after_worker_err(&mut epoch_node.state.node_to_collect, worker_id) {
684 return false;
685 }
686 }
687 true
689 }
690}
691
692impl DatabaseCheckpointControl {
693 fn collect_backfill_pinned_upstream_log_epoch(
695 &self,
696 ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
697 self.creating_streaming_job_controls
698 .iter()
699 .map(|(job_id, creating_job)| {
700 let progress_epoch = creating_job.pinned_upstream_log_epoch();
701 (
702 *job_id,
703 (
704 progress_epoch,
705 creating_job.snapshot_backfill_upstream_tables.clone(),
706 ),
707 )
708 })
709 .collect()
710 }
711
712 fn next_complete_barrier_task(
713 &mut self,
714 task: &mut Option<CompleteBarrierTask>,
715 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
716 hummock_version_stats: &HummockVersionStats,
717 ) {
718 let mut creating_jobs_task = vec![];
720 {
721 let mut finished_jobs = Vec::new();
723 let min_upstream_inflight_barrier = self
724 .command_ctx_queue
725 .first_key_value()
726 .map(|(epoch, _)| *epoch);
727 for (job_id, job) in &mut self.creating_streaming_job_controls {
728 if let Some((epoch, resps, status)) =
729 job.start_completing(min_upstream_inflight_barrier)
730 {
731 let is_first_time = match status {
732 CompleteJobType::First => true,
733 CompleteJobType::Normal => false,
734 CompleteJobType::Finished => {
735 finished_jobs.push((*job_id, epoch, resps));
736 continue;
737 }
738 };
739 creating_jobs_task.push((*job_id, epoch, resps, is_first_time));
740 }
741 }
742 if !finished_jobs.is_empty()
743 && let Some((_, control_stream_manager)) = &mut context
744 {
745 control_stream_manager.remove_partial_graph(
746 self.database_id,
747 finished_jobs.iter().map(|(job_id, _, _)| *job_id).collect(),
748 );
749 }
750 for (job_id, epoch, resps) in finished_jobs {
751 let epoch_state = &mut self
752 .command_ctx_queue
753 .get_mut(&epoch)
754 .expect("should exist")
755 .state;
756 assert!(epoch_state.creating_jobs_to_wait.remove(&job_id));
757 debug!(epoch, %job_id, "finish creating job");
758 let creating_streaming_job = self
761 .creating_streaming_job_controls
762 .remove(&job_id)
763 .expect("should exist");
764 let tracking_job = creating_streaming_job.into_tracking_job();
765
766 assert!(
767 epoch_state
768 .finished_jobs
769 .insert(job_id, (resps, tracking_job))
770 .is_none()
771 );
772 }
773 }
774 assert!(self.completing_barrier.is_none());
775 while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
776 && !state.is_inflight()
777 {
778 {
779 let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
780 assert!(node.state.creating_jobs_to_wait.is_empty());
781 assert!(node.state.node_to_collect.is_empty());
782
783 self.handle_refresh_table_info(task, &node);
784
785 self.database_info.apply_collected_command(
786 node.command_ctx.command.as_ref(),
787 &node.state.resps,
788 hummock_version_stats,
789 );
790 if !node.command_ctx.barrier_info.kind.is_checkpoint() {
791 node.notifiers.into_iter().for_each(|notifier| {
792 notifier.notify_collected();
793 });
794 if let Some((periodic_barriers, _)) = &mut context
795 && self.database_info.has_pending_finished_jobs()
796 && self
797 .command_ctx_queue
798 .values()
799 .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
800 {
801 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
802 }
803 continue;
804 }
805 let mut staging_commit_info = self.database_info.take_staging_commit_info();
806 node.state
807 .finished_jobs
808 .drain()
809 .for_each(|(_job_id, (resps, tracking_job))| {
810 node.state.resps.extend(resps);
811 staging_commit_info.finished_jobs.push(tracking_job);
812 });
813 let task = task.get_or_insert_default();
814 node.command_ctx.collect_commit_epoch_info(
815 &mut task.commit_info,
816 take(&mut node.state.resps),
817 self.collect_backfill_pinned_upstream_log_epoch(),
818 );
819 self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
820 task.finished_jobs.extend(staging_commit_info.finished_jobs);
821 task.finished_cdc_table_backfill
822 .extend(staging_commit_info.finished_cdc_table_backfill);
823 task.notifiers.extend(node.notifiers);
824 task.epoch_infos
825 .try_insert(
826 self.database_id,
827 (Some((node.command_ctx, node.enqueue_time)), vec![]),
828 )
829 .expect("non duplicate");
830 task.commit_info
831 .truncate_tables
832 .extend(staging_commit_info.table_ids_to_truncate);
833 break;
834 }
835 }
836 if !creating_jobs_task.is_empty() {
837 let task = task.get_or_insert_default();
838 for (job_id, epoch, resps, is_first_time) in creating_jobs_task {
839 collect_creating_job_commit_epoch_info(
840 &mut task.commit_info,
841 epoch,
842 resps,
843 self.creating_streaming_job_controls[&job_id]
844 .state_table_ids()
845 .iter()
846 .copied(),
847 is_first_time,
848 );
849 let (_, creating_job_epochs) =
850 task.epoch_infos.entry(self.database_id).or_default();
851 creating_job_epochs.push((job_id, epoch));
852 }
853 }
854 }
855
856 fn ack_completed(
857 &mut self,
858 command_prev_epoch: Option<u64>,
859 creating_job_epochs: Vec<(JobId, u64)>,
860 ) {
861 {
862 if let Some(prev_epoch) = self.completing_barrier.take() {
863 assert_eq!(command_prev_epoch, Some(prev_epoch));
864 self.committed_epoch = Some(prev_epoch);
865 } else {
866 assert_eq!(command_prev_epoch, None);
867 };
868 for (job_id, epoch) in creating_job_epochs {
869 self.creating_streaming_job_controls
870 .get_mut(&job_id)
871 .expect("should exist")
872 .ack_completed(epoch)
873 }
874 }
875 }
876
877 fn handle_refresh_table_info(&self, task: &mut Option<CompleteBarrierTask>, node: &EpochNode) {
878 let list_finished_info = node
879 .state
880 .resps
881 .iter()
882 .flat_map(|resp| resp.list_finished_sources.clone())
883 .collect::<Vec<_>>();
884 if !list_finished_info.is_empty() {
885 let task = task.get_or_insert_default();
886 task.list_finished_source_ids.extend(list_finished_info);
887 }
888
889 let load_finished_info = node
890 .state
891 .resps
892 .iter()
893 .flat_map(|resp| resp.load_finished_sources.clone())
894 .collect::<Vec<_>>();
895 if !load_finished_info.is_empty() {
896 let task = task.get_or_insert_default();
897 task.load_finished_source_ids.extend(load_finished_info);
898 }
899
900 let refresh_finished_table_ids: Vec<JobId> = node
901 .state
902 .resps
903 .iter()
904 .flat_map(|resp| {
905 resp.refresh_finished_tables
906 .iter()
907 .map(|table_id| table_id.as_job_id())
908 })
909 .collect::<Vec<_>>();
910 if !refresh_finished_table_ids.is_empty() {
911 let task = task.get_or_insert_default();
912 task.refresh_finished_table_job_ids
913 .extend(refresh_finished_table_ids);
914 }
915 }
916}
917
918struct EpochNode {
920 enqueue_time: HistogramTimer,
922
923 state: BarrierEpochState,
925
926 command_ctx: CommandContext,
928 notifiers: Vec<Notifier>,
930}
931
932#[derive(Debug)]
933struct BarrierEpochState {
935 node_to_collect: NodeToCollect,
936
937 resps: Vec<BarrierCompleteResponse>,
938
939 creating_jobs_to_wait: HashSet<JobId>,
940
941 finished_jobs: HashMap<JobId, (Vec<BarrierCompleteResponse>, TrackingJob)>,
942}
943
944impl BarrierEpochState {
945 fn is_inflight(&self) -> bool {
946 !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
947 }
948}
949
950impl DatabaseCheckpointControl {
951 fn handle_new_barrier(
953 &mut self,
954 command: Option<(Command, Vec<Notifier>)>,
955 checkpoint: bool,
956 span: tracing::Span,
957 control_stream_manager: &mut ControlStreamManager,
958 hummock_version_stats: &HummockVersionStats,
959 ) -> MetaResult<()> {
960 let curr_epoch = self.state.in_flight_prev_epoch().next();
961
962 let (command, mut notifiers) = if let Some((command, notifiers)) = command {
963 (Some(command), notifiers)
964 } else {
965 (None, vec![])
966 };
967
968 for job_to_cancel in command
969 .as_ref()
970 .map(Command::jobs_to_drop)
971 .into_iter()
972 .flatten()
973 {
974 if self
975 .creating_streaming_job_controls
976 .contains_key(&job_to_cancel)
977 {
978 warn!(
979 job_id = %job_to_cancel,
980 "ignore cancel command on creating streaming job"
981 );
982 for notifier in notifiers {
983 notifier
984 .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
985 }
986 return Ok(());
987 }
988 }
989
990 if let Some(Command::RescheduleFragment { .. }) = &command
991 && !self.creating_streaming_job_controls.is_empty()
992 {
993 warn!("ignore reschedule when creating streaming job with snapshot backfill");
994 for notifier in notifiers {
995 notifier.notify_start_failed(
996 anyhow!(
997 "cannot reschedule when creating streaming job with snapshot backfill",
998 )
999 .into(),
1000 );
1001 }
1002 return Ok(());
1003 }
1004
1005 if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1006 && self.database_info.is_empty()
1007 {
1008 assert!(
1009 self.creating_streaming_job_controls.is_empty(),
1010 "should not have snapshot backfill job when there is no normal job in database"
1011 );
1012 for mut notifier in notifiers {
1014 notifier.notify_started();
1015 notifier.notify_collected();
1016 }
1017 return Ok(());
1018 };
1019
1020 if let Some(Command::CreateStreamingJob {
1021 job_type: CreateStreamingJobType::SnapshotBackfill(_),
1022 ..
1023 }) = &command
1024 && self.state.is_paused()
1025 {
1026 warn!("cannot create streaming job with snapshot backfill when paused");
1027 for notifier in notifiers {
1028 notifier.notify_start_failed(
1029 anyhow!("cannot create streaming job with snapshot backfill when paused",)
1030 .into(),
1031 );
1032 }
1033 return Ok(());
1034 }
1035
1036 let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1037 barrier_info.prev_epoch.span().in_scope(|| {
1039 tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch(), "new barrier enqueued");
1040 });
1041 span.record("epoch", barrier_info.curr_epoch());
1042
1043 let ApplyCommandInfo {
1044 mv_subscription_max_retention,
1045 table_ids_to_commit,
1046 jobs_to_wait,
1047 node_to_collect,
1048 command,
1049 } = match self.apply_command(
1050 command,
1051 &barrier_info,
1052 control_stream_manager,
1053 hummock_version_stats,
1054 ) {
1055 Ok(info) => info,
1056 Err(err) => {
1057 for notifier in notifiers {
1058 notifier.notify_start_failed(err.clone());
1059 }
1060 fail_point!("inject_barrier_err_success");
1061 return Err(err);
1062 }
1063 };
1064
1065 notifiers.iter_mut().for_each(|n| n.notify_started());
1067
1068 let command_ctx = CommandContext::new(
1069 barrier_info,
1070 mv_subscription_max_retention,
1071 table_ids_to_commit,
1072 command,
1073 span,
1074 );
1075
1076 self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1078
1079 Ok(())
1080 }
1081}