1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::ddl_service::DdlProgress;
29use risingwave_pb::hummock::HummockVersionStats;
30use risingwave_pb::id::FragmentId;
31use risingwave_pb::stream_service::BarrierCompleteResponse;
32use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
33use thiserror_ext::AsReport;
34use tracing::{debug, warn};
35
36use crate::barrier::cdc_progress::CdcTableBackfillTrackerRef;
37use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
38use crate::barrier::checkpoint::recovery::{
39 DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
40 RecoveringStateAction,
41};
42use crate::barrier::checkpoint::state::BarrierWorkerState;
43use crate::barrier::command::CommandContext;
44use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
45use crate::barrier::info::SharedActorInfos;
46use crate::barrier::notifier::Notifier;
47use crate::barrier::progress::TrackingJob;
48use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
49use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
50use crate::barrier::utils::{
51 NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
52};
53use crate::barrier::{BarrierKind, Command, CreateStreamingJobType};
54use crate::controller::fragment::InflightFragmentInfo;
55use crate::manager::MetaSrvEnv;
56use crate::rpc::metrics::GLOBAL_META_METRICS;
57use crate::stream::fill_snapshot_backfill_epoch;
58use crate::{MetaError, MetaResult};
59
60pub(crate) struct CheckpointControl {
61 pub(crate) env: MetaSrvEnv,
62 pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
63 pub(super) hummock_version_stats: HummockVersionStats,
64 pub(crate) in_flight_barrier_nums: usize,
66}
67
68impl CheckpointControl {
69 pub fn new(env: MetaSrvEnv) -> Self {
70 Self {
71 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
72 env,
73 databases: Default::default(),
74 hummock_version_stats: Default::default(),
75 }
76 }
77
78 pub(crate) fn recover(
79 databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
80 failed_databases: HashSet<DatabaseId>,
81 control_stream_manager: &mut ControlStreamManager,
82 hummock_version_stats: HummockVersionStats,
83 env: MetaSrvEnv,
84 ) -> Self {
85 env.shared_actor_infos()
86 .retain_databases(databases.keys().chain(&failed_databases).cloned());
87 Self {
88 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
89 env,
90 databases: databases
91 .into_iter()
92 .map(|(database_id, control)| {
93 (
94 database_id,
95 DatabaseCheckpointControlStatus::Running(control),
96 )
97 })
98 .chain(failed_databases.into_iter().map(|database_id| {
99 (
100 database_id,
101 DatabaseCheckpointControlStatus::Recovering(
102 DatabaseRecoveringState::resetting(database_id, control_stream_manager),
103 ),
104 )
105 }))
106 .collect(),
107 hummock_version_stats,
108 }
109 }
110
111 pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
112 self.hummock_version_stats = output.hummock_version_stats;
113 for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
114 self.databases
115 .get_mut(&database_id)
116 .expect("should exist")
117 .expect_running("should have wait for completing command before enter recovery")
118 .ack_completed(command_prev_epoch, creating_job_epochs);
119 }
120 }
121
122 pub(crate) fn next_complete_barrier_task(
123 &mut self,
124 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
125 ) -> Option<CompleteBarrierTask> {
126 let mut task = None;
127 for database in self.databases.values_mut() {
128 let Some(database) = database.running_state_mut() else {
129 continue;
130 };
131 let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
132 database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
133 }
134 task
135 }
136
137 pub(crate) fn barrier_collected(
138 &mut self,
139 resp: BarrierCompleteResponse,
140 periodic_barriers: &mut PeriodicBarriers,
141 ) -> MetaResult<()> {
142 let database_id = resp.database_id;
143 let database_status = self.databases.get_mut(&database_id).expect("should exist");
144 match database_status {
145 DatabaseCheckpointControlStatus::Running(database) => {
146 database.barrier_collected(resp, periodic_barriers)
147 }
148 DatabaseCheckpointControlStatus::Recovering(state) => {
149 state.barrier_collected(database_id, resp);
150 Ok(())
151 }
152 }
153 }
154
155 pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
156 self.databases.iter().filter_map(|(database_id, database)| {
157 database.running_state().is_none().then_some(*database_id)
158 })
159 }
160
161 pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
162 self.databases.iter().filter_map(|(database_id, database)| {
163 database.running_state().is_some().then_some(*database_id)
164 })
165 }
166
167 pub(crate) fn handle_new_barrier(
169 &mut self,
170 new_barrier: NewBarrier,
171 control_stream_manager: &mut ControlStreamManager,
172 ) -> MetaResult<()> {
173 let NewBarrier {
174 database_id,
175 command,
176 span,
177 checkpoint,
178 } = new_barrier;
179
180 if let Some((mut command, notifiers)) = command {
181 if let &mut Command::CreateStreamingJob {
182 ref mut cross_db_snapshot_backfill_info,
183 ref info,
184 ..
185 } = &mut command
186 {
187 for (table_id, snapshot_epoch) in
188 &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
189 {
190 for database in self.databases.values() {
191 if let Some(database) = database.running_state()
192 && database
193 .state
194 .inflight_graph_info
195 .contains_job(table_id.as_job_id())
196 {
197 if let Some(committed_epoch) = database.committed_epoch {
198 *snapshot_epoch = Some(committed_epoch);
199 }
200 break;
201 }
202 }
203 if snapshot_epoch.is_none() {
204 let table_id = *table_id;
205 warn!(
206 ?cross_db_snapshot_backfill_info,
207 ?table_id,
208 ?info,
209 "database of cross db upstream table not found"
210 );
211 let err: MetaError =
212 anyhow!("database of cross db upstream table {} not found", table_id)
213 .into();
214 for notifier in notifiers {
215 notifier.notify_start_failed(err.clone());
216 }
217
218 return Ok(());
219 }
220 }
221 }
222
223 let database = match self.databases.entry(database_id) {
224 Entry::Occupied(entry) => entry
225 .into_mut()
226 .expect_running("should not have command when not running"),
227 Entry::Vacant(entry) => match &command {
228 Command::CreateStreamingJob {
229 job_type: CreateStreamingJobType::Normal,
230 ..
231 } => {
232 let new_database = DatabaseCheckpointControl::new(
233 database_id,
234 self.env.shared_actor_infos().clone(),
235 self.env.cdc_table_backfill_tracker(),
236 );
237 control_stream_manager.add_partial_graph(database_id, None);
238 entry
239 .insert(DatabaseCheckpointControlStatus::Running(new_database))
240 .expect_running("just initialized as running")
241 }
242 Command::Flush | Command::Pause | Command::Resume => {
243 for mut notifier in notifiers {
244 notifier.notify_started();
245 notifier.notify_collected();
246 }
247 warn!(?command, "skip command for empty database");
248 return Ok(());
249 }
250 _ => {
251 panic!(
252 "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
253 database_id, command
254 )
255 }
256 },
257 };
258
259 database.handle_new_barrier(
260 Some((command, notifiers)),
261 checkpoint,
262 span,
263 control_stream_manager,
264 &self.hummock_version_stats,
265 )
266 } else {
267 let database = match self.databases.entry(database_id) {
268 Entry::Occupied(entry) => entry.into_mut(),
269 Entry::Vacant(_) => {
270 return Ok(());
273 }
274 };
275 let Some(database) = database.running_state_mut() else {
276 return Ok(());
278 };
279 if !database.can_inject_barrier(self.in_flight_barrier_nums) {
280 return Ok(());
282 }
283 database.handle_new_barrier(
284 None,
285 checkpoint,
286 span,
287 control_stream_manager,
288 &self.hummock_version_stats,
289 )
290 }
291 }
292
293 pub(crate) fn update_barrier_nums_metrics(&self) {
294 self.databases
295 .values()
296 .flat_map(|database| database.running_state())
297 .for_each(|database| database.update_barrier_nums_metrics());
298 }
299
300 pub(crate) fn gen_ddl_progress(&self) -> HashMap<JobId, DdlProgress> {
301 let mut progress = HashMap::new();
302 for status in self.databases.values() {
303 let Some(database_checkpoint_control) = status.running_state() else {
304 continue;
305 };
306 progress.extend(
308 database_checkpoint_control
309 .state
310 .inflight_graph_info
311 .gen_ddl_progress(),
312 );
313 for creating_job in database_checkpoint_control
315 .creating_streaming_job_controls
316 .values()
317 {
318 progress.extend([(creating_job.job_id, creating_job.gen_ddl_progress())]);
319 }
320 }
321 progress
322 }
323
324 pub(crate) fn databases_failed_at_worker_err(
325 &mut self,
326 worker_id: WorkerId,
327 ) -> Vec<DatabaseId> {
328 let mut failed_databases = Vec::new();
329 for (database_id, database_status) in &mut self.databases {
330 let database_checkpoint_control = match database_status {
331 DatabaseCheckpointControlStatus::Running(control) => control,
332 DatabaseCheckpointControlStatus::Recovering(state) => {
333 if !state.is_valid_after_worker_err(worker_id) {
334 failed_databases.push(*database_id);
335 }
336 continue;
337 }
338 };
339
340 if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
341 || database_checkpoint_control
342 .state
343 .inflight_graph_info
344 .contains_worker(worker_id as _)
345 || database_checkpoint_control
346 .creating_streaming_job_controls
347 .values_mut()
348 .any(|job| !job.is_valid_after_worker_err(worker_id))
349 {
350 failed_databases.push(*database_id);
351 }
352 }
353 failed_databases
354 }
355
356 pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
357 for (_, node) in self.databases.values_mut().flat_map(|status| {
358 status
359 .running_state_mut()
360 .map(|database| take(&mut database.command_ctx_queue))
361 .into_iter()
362 .flatten()
363 }) {
364 for notifier in node.notifiers {
365 notifier.notify_collection_failed(err.clone());
366 }
367 node.enqueue_time.observe_duration();
368 }
369 }
370
371 pub(crate) fn inflight_infos(
372 &self,
373 ) -> impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId> + '_)> + '_ {
374 self.databases.iter().flat_map(|(database_id, database)| {
375 database.database_state().map(|(_, creating_jobs)| {
376 (
377 *database_id,
378 creating_jobs
379 .values()
380 .filter_map(|job| job.is_consuming().then_some(job.job_id)),
381 )
382 })
383 })
384 }
385}
386
387pub(crate) enum CheckpointControlEvent<'a> {
388 EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
389 EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
390}
391
392impl CheckpointControl {
393 pub(crate) fn on_reset_database_resp(
394 &mut self,
395 worker_id: WorkerId,
396 resp: ResetDatabaseResponse,
397 ) {
398 let database_id = resp.database_id;
399 match self.databases.get_mut(&database_id).expect("should exist") {
400 DatabaseCheckpointControlStatus::Running(_) => {
401 unreachable!("should not receive reset database resp when running")
402 }
403 DatabaseCheckpointControlStatus::Recovering(state) => {
404 state.on_reset_database_resp(worker_id, resp)
405 }
406 }
407 }
408
409 pub(crate) fn next_event(
410 &mut self,
411 ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
412 let mut this = Some(self);
413 poll_fn(move |cx| {
414 let Some(this_mut) = this.as_mut() else {
415 unreachable!("should not be polled after poll ready")
416 };
417 for (&database_id, database_status) in &mut this_mut.databases {
418 match database_status {
419 DatabaseCheckpointControlStatus::Running(_) => {}
420 DatabaseCheckpointControlStatus::Recovering(state) => {
421 let poll_result = state.poll_next_event(cx);
422 if let Poll::Ready(action) = poll_result {
423 let this = this.take().expect("checked Some");
424 return Poll::Ready(match action {
425 RecoveringStateAction::EnterInitializing(reset_workers) => {
426 CheckpointControlEvent::EnteringInitializing(
427 this.new_database_status_action(
428 database_id,
429 EnterInitializing(reset_workers),
430 ),
431 )
432 }
433 RecoveringStateAction::EnterRunning => {
434 CheckpointControlEvent::EnteringRunning(
435 this.new_database_status_action(database_id, EnterRunning),
436 )
437 }
438 });
439 }
440 }
441 }
442 }
443 Poll::Pending
444 })
445 }
446}
447
448pub(crate) enum DatabaseCheckpointControlStatus {
449 Running(DatabaseCheckpointControl),
450 Recovering(DatabaseRecoveringState),
451}
452
453impl DatabaseCheckpointControlStatus {
454 fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
455 match self {
456 DatabaseCheckpointControlStatus::Running(state) => Some(state),
457 DatabaseCheckpointControlStatus::Recovering(_) => None,
458 }
459 }
460
461 fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
462 match self {
463 DatabaseCheckpointControlStatus::Running(state) => Some(state),
464 DatabaseCheckpointControlStatus::Recovering(_) => None,
465 }
466 }
467
468 fn database_state(
469 &self,
470 ) -> Option<(
471 &BarrierWorkerState,
472 &HashMap<JobId, CreatingStreamingJobControl>,
473 )> {
474 match self {
475 DatabaseCheckpointControlStatus::Running(control) => {
476 Some((&control.state, &control.creating_streaming_job_controls))
477 }
478 DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
479 }
480 }
481
482 fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
483 match self {
484 DatabaseCheckpointControlStatus::Running(state) => state,
485 DatabaseCheckpointControlStatus::Recovering(_) => {
486 panic!("should be at running: {}", reason)
487 }
488 }
489 }
490}
491
492struct DatabaseCheckpointControlMetrics {
493 barrier_latency: LabelGuardedHistogram,
494 in_flight_barrier_nums: LabelGuardedIntGauge,
495 all_barrier_nums: LabelGuardedIntGauge,
496}
497
498impl DatabaseCheckpointControlMetrics {
499 fn new(database_id: DatabaseId) -> Self {
500 let database_id_str = database_id.to_string();
501 let barrier_latency = GLOBAL_META_METRICS
502 .barrier_latency
503 .with_guarded_label_values(&[&database_id_str]);
504 let in_flight_barrier_nums = GLOBAL_META_METRICS
505 .in_flight_barrier_nums
506 .with_guarded_label_values(&[&database_id_str]);
507 let all_barrier_nums = GLOBAL_META_METRICS
508 .all_barrier_nums
509 .with_guarded_label_values(&[&database_id_str]);
510 Self {
511 barrier_latency,
512 in_flight_barrier_nums,
513 all_barrier_nums,
514 }
515 }
516}
517
518pub(crate) struct DatabaseCheckpointControl {
520 database_id: DatabaseId,
521 state: BarrierWorkerState,
522
523 command_ctx_queue: BTreeMap<u64, EpochNode>,
526 completing_barrier: Option<u64>,
529
530 committed_epoch: Option<u64>,
531 creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
532
533 cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
534
535 metrics: DatabaseCheckpointControlMetrics,
536}
537
538impl DatabaseCheckpointControl {
539 fn new(
540 database_id: DatabaseId,
541 shared_actor_infos: SharedActorInfos,
542 cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
543 ) -> Self {
544 Self {
545 database_id,
546 state: BarrierWorkerState::new(database_id, shared_actor_infos),
547 command_ctx_queue: Default::default(),
548 completing_barrier: None,
549 committed_epoch: None,
550 creating_streaming_job_controls: Default::default(),
551 cdc_table_backfill_tracker,
552 metrics: DatabaseCheckpointControlMetrics::new(database_id),
553 }
554 }
555
556 pub(crate) fn recovery(
557 database_id: DatabaseId,
558 state: BarrierWorkerState,
559 committed_epoch: u64,
560 creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
561 cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
562 ) -> Self {
563 Self {
564 database_id,
565 state,
566 command_ctx_queue: Default::default(),
567 completing_barrier: None,
568 committed_epoch: Some(committed_epoch),
569 creating_streaming_job_controls,
570 cdc_table_backfill_tracker,
571 metrics: DatabaseCheckpointControlMetrics::new(database_id),
572 }
573 }
574
575 fn total_command_num(&self) -> usize {
576 self.command_ctx_queue.len()
577 + match &self.completing_barrier {
578 Some(_) => 1,
579 None => 0,
580 }
581 }
582
583 fn update_barrier_nums_metrics(&self) {
585 self.metrics.in_flight_barrier_nums.set(
586 self.command_ctx_queue
587 .values()
588 .filter(|x| x.state.is_inflight())
589 .count() as i64,
590 );
591 self.metrics
592 .all_barrier_nums
593 .set(self.total_command_num() as i64);
594 }
595
596 #[expect(clippy::type_complexity)]
597 fn jobs_to_merge(
598 &self,
599 ) -> Option<HashMap<JobId, (HashSet<TableId>, HashMap<FragmentId, InflightFragmentInfo>)>> {
600 let mut job_ids_to_merge = HashMap::new();
601
602 for (job_id, creating_streaming_job) in &self.creating_streaming_job_controls {
603 if let Some(graph_info) = creating_streaming_job.should_merge_to_upstream() {
604 job_ids_to_merge.insert(
605 *job_id,
606 (
607 creating_streaming_job
608 .snapshot_backfill_upstream_tables
609 .clone(),
610 graph_info.clone(),
611 ),
612 );
613 }
614 }
615 if job_ids_to_merge.is_empty() {
616 None
617 } else {
618 Some(job_ids_to_merge)
619 }
620 }
621
622 fn enqueue_command(
624 &mut self,
625 command_ctx: CommandContext,
626 notifiers: Vec<Notifier>,
627 node_to_collect: NodeToCollect,
628 creating_jobs_to_wait: HashSet<JobId>,
629 ) {
630 let timer = self.metrics.barrier_latency.start_timer();
631
632 if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
633 assert_eq!(
634 command_ctx.barrier_info.prev_epoch.value(),
635 node.command_ctx.barrier_info.curr_epoch.value()
636 );
637 }
638
639 tracing::trace!(
640 prev_epoch = command_ctx.barrier_info.prev_epoch(),
641 ?creating_jobs_to_wait,
642 ?node_to_collect,
643 "enqueue command"
644 );
645 self.command_ctx_queue.insert(
646 command_ctx.barrier_info.prev_epoch(),
647 EpochNode {
648 enqueue_time: timer,
649 state: BarrierEpochState {
650 node_to_collect,
651 resps: vec![],
652 creating_jobs_to_wait,
653 finished_jobs: HashMap::new(),
654 },
655 command_ctx,
656 notifiers,
657 },
658 );
659 }
660
661 fn barrier_collected(
664 &mut self,
665 resp: BarrierCompleteResponse,
666 periodic_barriers: &mut PeriodicBarriers,
667 ) -> MetaResult<()> {
668 let worker_id = resp.worker_id;
669 let prev_epoch = resp.epoch;
670 tracing::trace!(
671 %worker_id,
672 prev_epoch,
673 partial_graph_id = resp.partial_graph_id,
674 "barrier collected"
675 );
676 let creating_job_id = from_partial_graph_id(resp.partial_graph_id);
677 match creating_job_id {
678 None => {
679 if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
680 assert!(node.state.node_to_collect.remove(&worker_id).is_some());
681 node.state.resps.push(resp);
682 } else {
683 panic!(
684 "collect barrier on non-existing barrier: {}, {}",
685 prev_epoch, worker_id
686 );
687 }
688 }
689 Some(creating_job_id) => {
690 let should_merge_to_upstream = self
691 .creating_streaming_job_controls
692 .get_mut(&creating_job_id)
693 .expect("should exist")
694 .collect(resp);
695 if should_merge_to_upstream {
696 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
697 }
698 }
699 }
700 Ok(())
701 }
702
703 fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
705 self.command_ctx_queue
706 .values()
707 .filter(|x| x.state.is_inflight())
708 .count()
709 < in_flight_barrier_nums
710 }
711
712 pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
714 for epoch_node in self.command_ctx_queue.values_mut() {
715 if !is_valid_after_worker_err(&mut epoch_node.state.node_to_collect, worker_id) {
716 return false;
717 }
718 }
719 true
721 }
722}
723
724impl DatabaseCheckpointControl {
725 fn collect_backfill_pinned_upstream_log_epoch(
727 &self,
728 ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
729 self.creating_streaming_job_controls
730 .iter()
731 .filter_map(|(job_id, creating_job)| {
732 creating_job
733 .pinned_upstream_log_epoch()
734 .map(|progress_epoch| {
735 (
736 *job_id,
737 (
738 progress_epoch,
739 creating_job.snapshot_backfill_upstream_tables.clone(),
740 ),
741 )
742 })
743 })
744 .collect()
745 }
746
747 fn next_complete_barrier_task(
748 &mut self,
749 task: &mut Option<CompleteBarrierTask>,
750 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
751 hummock_version_stats: &HummockVersionStats,
752 ) {
753 let mut creating_jobs_task = vec![];
755 {
756 let mut finished_jobs = Vec::new();
758 let min_upstream_inflight_barrier = self
759 .command_ctx_queue
760 .first_key_value()
761 .map(|(epoch, _)| *epoch);
762 for (job_id, job) in &mut self.creating_streaming_job_controls {
763 if let Some((epoch, resps, status)) =
764 job.start_completing(min_upstream_inflight_barrier)
765 {
766 let is_first_time = match status {
767 CompleteJobType::First => true,
768 CompleteJobType::Normal => false,
769 CompleteJobType::Finished => {
770 finished_jobs.push((*job_id, epoch, resps));
771 continue;
772 }
773 };
774 creating_jobs_task.push((*job_id, epoch, resps, is_first_time));
775 }
776 }
777 if !finished_jobs.is_empty()
778 && let Some((_, control_stream_manager)) = &mut context
779 {
780 control_stream_manager.remove_partial_graph(
781 self.database_id,
782 finished_jobs.iter().map(|(job_id, _, _)| *job_id).collect(),
783 );
784 }
785 for (job_id, epoch, resps) in finished_jobs {
786 let epoch_state = &mut self
787 .command_ctx_queue
788 .get_mut(&epoch)
789 .expect("should exist")
790 .state;
791 assert!(epoch_state.creating_jobs_to_wait.remove(&job_id));
792 debug!(epoch, %job_id, "finish creating job");
793 let creating_streaming_job = self
796 .creating_streaming_job_controls
797 .remove(&job_id)
798 .expect("should exist");
799 let tracking_job = creating_streaming_job.into_tracking_job();
800
801 assert!(
802 epoch_state
803 .finished_jobs
804 .insert(job_id, (resps, tracking_job))
805 .is_none()
806 );
807 }
808 }
809 assert!(self.completing_barrier.is_none());
810 while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
811 && !state.is_inflight()
812 {
813 {
814 let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
815 assert!(node.state.creating_jobs_to_wait.is_empty());
816 assert!(node.state.node_to_collect.is_empty());
817
818 self.handle_refresh_table_info(task, &node);
819
820 self.state.inflight_graph_info.apply_collected_command(
821 node.command_ctx.command.as_ref(),
822 node.state.resps.iter(),
823 hummock_version_stats,
824 );
825 let finished_cdc_backfill = self
826 .cdc_table_backfill_tracker
827 .apply_collected_command(&node.state.resps);
828 if !node.command_ctx.barrier_info.kind.is_checkpoint() {
829 node.notifiers.into_iter().for_each(|notifier| {
830 notifier.notify_collected();
831 });
832 if let Some((periodic_barriers, _)) = &mut context
833 && self.state.inflight_graph_info.has_pending_finished_jobs()
834 && self
835 .command_ctx_queue
836 .values()
837 .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
838 {
839 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
840 }
841 continue;
842 }
843 let mut staging_commit_info =
844 self.state.inflight_graph_info.take_staging_commit_info();
845 node.state
846 .finished_jobs
847 .drain()
848 .for_each(|(_job_id, (resps, tracking_job))| {
849 node.state.resps.extend(resps);
850 staging_commit_info.finished_jobs.push(tracking_job);
851 });
852 let task = task.get_or_insert_default();
853 node.command_ctx.collect_commit_epoch_info(
854 &mut task.commit_info,
855 take(&mut node.state.resps),
856 self.collect_backfill_pinned_upstream_log_epoch(),
857 );
858 self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
859 task.finished_jobs.extend(staging_commit_info.finished_jobs);
860 task.finished_cdc_table_backfill
861 .extend(finished_cdc_backfill);
862 task.notifiers.extend(node.notifiers);
863 task.epoch_infos
864 .try_insert(
865 self.database_id,
866 (Some((node.command_ctx, node.enqueue_time)), vec![]),
867 )
868 .expect("non duplicate");
869 task.commit_info
870 .truncate_tables
871 .extend(staging_commit_info.table_ids_to_truncate);
872 break;
873 }
874 }
875 if !creating_jobs_task.is_empty() {
876 let task = task.get_or_insert_default();
877 for (job_id, epoch, resps, is_first_time) in creating_jobs_task {
878 collect_creating_job_commit_epoch_info(
879 &mut task.commit_info,
880 epoch,
881 resps,
882 self.creating_streaming_job_controls[&job_id].state_table_ids(),
883 is_first_time,
884 );
885 let (_, creating_job_epochs) =
886 task.epoch_infos.entry(self.database_id).or_default();
887 creating_job_epochs.push((job_id, epoch));
888 }
889 }
890 }
891
892 fn ack_completed(
893 &mut self,
894 command_prev_epoch: Option<u64>,
895 creating_job_epochs: Vec<(JobId, u64)>,
896 ) {
897 {
898 if let Some(prev_epoch) = self.completing_barrier.take() {
899 assert_eq!(command_prev_epoch, Some(prev_epoch));
900 self.committed_epoch = Some(prev_epoch);
901 } else {
902 assert_eq!(command_prev_epoch, None);
903 };
904 for (job_id, epoch) in creating_job_epochs {
905 self.creating_streaming_job_controls
906 .get_mut(&job_id)
907 .expect("should exist")
908 .ack_completed(epoch)
909 }
910 }
911 }
912
913 fn handle_refresh_table_info(&self, task: &mut Option<CompleteBarrierTask>, node: &EpochNode) {
914 let list_finished_info = node
915 .state
916 .resps
917 .iter()
918 .flat_map(|resp| resp.list_finished_sources.clone())
919 .collect::<Vec<_>>();
920 if !list_finished_info.is_empty() {
921 let task = task.get_or_insert_default();
922 task.list_finished_source_ids.extend(list_finished_info);
923 }
924
925 let load_finished_info = node
926 .state
927 .resps
928 .iter()
929 .flat_map(|resp| resp.load_finished_sources.clone())
930 .collect::<Vec<_>>();
931 if !load_finished_info.is_empty() {
932 let task = task.get_or_insert_default();
933 task.load_finished_source_ids.extend(load_finished_info);
934 }
935
936 let refresh_finished_table_ids: Vec<JobId> = node
937 .state
938 .resps
939 .iter()
940 .flat_map(|resp| {
941 resp.refresh_finished_tables
942 .iter()
943 .map(|table_id| table_id.as_job_id())
944 })
945 .collect::<Vec<_>>();
946 if !refresh_finished_table_ids.is_empty() {
947 let task = task.get_or_insert_default();
948 task.refresh_finished_table_job_ids
949 .extend(refresh_finished_table_ids);
950 }
951 }
952}
953
954struct EpochNode {
956 enqueue_time: HistogramTimer,
958
959 state: BarrierEpochState,
961
962 command_ctx: CommandContext,
964 notifiers: Vec<Notifier>,
966}
967
968#[derive(Debug)]
969struct BarrierEpochState {
971 node_to_collect: NodeToCollect,
972
973 resps: Vec<BarrierCompleteResponse>,
974
975 creating_jobs_to_wait: HashSet<JobId>,
976
977 finished_jobs: HashMap<JobId, (Vec<BarrierCompleteResponse>, TrackingJob)>,
978}
979
980impl BarrierEpochState {
981 fn is_inflight(&self) -> bool {
982 !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
983 }
984}
985
986impl DatabaseCheckpointControl {
987 fn handle_new_barrier(
989 &mut self,
990 command: Option<(Command, Vec<Notifier>)>,
991 checkpoint: bool,
992 span: tracing::Span,
993 control_stream_manager: &mut ControlStreamManager,
994 hummock_version_stats: &HummockVersionStats,
995 ) -> MetaResult<()> {
996 let curr_epoch = self.state.in_flight_prev_epoch().next();
997
998 let (mut command, mut notifiers) = if let Some((command, notifiers)) = command {
999 (Some(command), notifiers)
1000 } else {
1001 (None, vec![])
1002 };
1003
1004 for job_to_cancel in command
1005 .as_ref()
1006 .map(Command::jobs_to_drop)
1007 .into_iter()
1008 .flatten()
1009 {
1010 if self
1011 .creating_streaming_job_controls
1012 .contains_key(&job_to_cancel)
1013 {
1014 warn!(
1015 job_id = %job_to_cancel,
1016 "ignore cancel command on creating streaming job"
1017 );
1018 for notifier in notifiers {
1019 notifier
1020 .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
1021 }
1022 return Ok(());
1023 }
1024 }
1025
1026 if let Some(Command::RescheduleFragment { .. }) = &command
1027 && !self.creating_streaming_job_controls.is_empty()
1028 {
1029 warn!("ignore reschedule when creating streaming job with snapshot backfill");
1030 for notifier in notifiers {
1031 notifier.notify_start_failed(
1032 anyhow!(
1033 "cannot reschedule when creating streaming job with snapshot backfill",
1034 )
1035 .into(),
1036 );
1037 }
1038 return Ok(());
1039 }
1040
1041 let Some(barrier_info) =
1042 self.state
1043 .next_barrier_info(command.as_ref(), checkpoint, curr_epoch)
1044 else {
1045 for mut notifier in notifiers {
1047 notifier.notify_started();
1048 notifier.notify_collected();
1049 }
1050 return Ok(());
1051 };
1052
1053 let mut edges = self
1054 .state
1055 .inflight_graph_info
1056 .build_edge(command.as_ref(), &*control_stream_manager);
1057
1058 if let Some(Command::CreateStreamingJob {
1060 job_type,
1061 info,
1062 cross_db_snapshot_backfill_info,
1063 }) = &mut command
1064 {
1065 match job_type {
1066 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
1067 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1068 fill_snapshot_backfill_epoch(
1069 &mut fragment.nodes,
1070 None,
1071 cross_db_snapshot_backfill_info,
1072 )?;
1073 }
1074 }
1075 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
1076 if self.state.is_paused() {
1077 warn!("cannot create streaming job with snapshot backfill when paused");
1078 for notifier in notifiers {
1079 notifier.notify_start_failed(
1080 anyhow!("cannot create streaming job with snapshot backfill when paused",)
1081 .into(),
1082 );
1083 }
1084 return Ok(());
1085 }
1086 for snapshot_backfill_epoch in snapshot_backfill_info
1088 .upstream_mv_table_id_to_backfill_epoch
1089 .values_mut()
1090 {
1091 assert_eq!(
1092 snapshot_backfill_epoch.replace(barrier_info.prev_epoch()),
1093 None,
1094 "must not set previously"
1095 );
1096 }
1097 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1098 if let Err(e) = fill_snapshot_backfill_epoch(
1099 &mut fragment.nodes,
1100 Some(snapshot_backfill_info),
1101 cross_db_snapshot_backfill_info,
1102 ) {
1103 warn!(e = %e.as_report(), "failed to fill snapshot backfill epoch");
1104 for notifier in notifiers {
1105 notifier.notify_start_failed(e.clone());
1106 }
1107 return Ok(());
1108 };
1109 }
1110 let job_id = info.stream_job_fragments.stream_job_id();
1111 let snapshot_backfill_upstream_tables = snapshot_backfill_info
1112 .upstream_mv_table_id_to_backfill_epoch
1113 .keys()
1114 .cloned()
1115 .collect();
1116
1117 let job = CreatingStreamingJobControl::new(
1118 info,
1119 snapshot_backfill_upstream_tables,
1120 barrier_info.prev_epoch(),
1121 hummock_version_stats,
1122 control_stream_manager,
1123 edges.as_mut().expect("should exist"),
1124 )?;
1125
1126 self.state.inflight_graph_info.shared_actor_infos.upsert(
1127 self.database_id,
1128 job.graph_info()
1129 .values()
1130 .map(|fragment| (fragment, job.job_id)),
1131 );
1132
1133 self.creating_streaming_job_controls.insert(job_id, job);
1134 }
1135 }
1136 }
1137
1138 if let (BarrierKind::Checkpoint(_), None) = (&barrier_info.kind, &command) {
1140 if let Some(jobs_to_merge) = self.jobs_to_merge() {
1141 command = Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge));
1142 } else {
1143 let pending_backfill_nodes =
1144 self.state.inflight_graph_info.take_pending_backfill_nodes();
1145 if !pending_backfill_nodes.is_empty() {
1146 command = Some(Command::StartFragmentBackfill {
1147 fragment_ids: pending_backfill_nodes,
1148 });
1149 }
1150 }
1151 }
1152
1153 let command = command;
1154
1155 let (
1156 pre_applied_graph_info,
1157 mv_subscription_max_retention,
1158 table_ids_to_commit,
1159 jobs_to_wait,
1160 prev_paused_reason,
1161 ) = self.state.apply_command(command.as_ref());
1162
1163 barrier_info.prev_epoch.span().in_scope(|| {
1165 tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch.value().0, "new barrier enqueued");
1166 });
1167 span.record("epoch", barrier_info.curr_epoch.value().0);
1168
1169 for creating_job in &mut self.creating_streaming_job_controls.values_mut() {
1170 creating_job.on_new_command(control_stream_manager, command.as_ref(), &barrier_info)?;
1171 }
1172
1173 let node_to_collect = match control_stream_manager.inject_command_ctx_barrier(
1174 self.database_id,
1175 command.as_ref(),
1176 &barrier_info,
1177 prev_paused_reason,
1178 &pre_applied_graph_info,
1179 &self.state.inflight_graph_info,
1180 &mut edges,
1181 ) {
1182 Ok(node_to_collect) => node_to_collect,
1183 Err(err) => {
1184 for notifier in notifiers {
1185 notifier.notify_start_failed(err.clone());
1186 }
1187 fail_point!("inject_barrier_err_success");
1188 return Err(err);
1189 }
1190 };
1191
1192 notifiers.iter_mut().for_each(|n| n.notify_started());
1194
1195 let command_ctx = CommandContext::new(
1196 barrier_info,
1197 mv_subscription_max_retention,
1198 table_ids_to_commit,
1199 command,
1200 span,
1201 );
1202
1203 self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1205
1206 Ok(())
1207 }
1208}