1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::ddl_service::DdlProgress;
29use risingwave_pb::hummock::HummockVersionStats;
30use risingwave_pb::stream_service::BarrierCompleteResponse;
31use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
32use thiserror_ext::AsReport;
33use tracing::{debug, warn};
34
35use crate::barrier::cdc_progress::CdcTableBackfillTrackerRef;
36use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
37use crate::barrier::checkpoint::recovery::{
38 DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
39 RecoveringStateAction,
40};
41use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
42use crate::barrier::command::CommandContext;
43use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
44use crate::barrier::info::SharedActorInfos;
45use crate::barrier::notifier::Notifier;
46use crate::barrier::progress::TrackingJob;
47use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
48use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
49use crate::barrier::utils::{
50 NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
51};
52use crate::barrier::{BarrierKind, Command, CreateStreamingJobType};
53use crate::manager::MetaSrvEnv;
54use crate::rpc::metrics::GLOBAL_META_METRICS;
55use crate::stream::fill_snapshot_backfill_epoch;
56use crate::{MetaError, MetaResult};
57
58pub(crate) struct CheckpointControl {
59 pub(crate) env: MetaSrvEnv,
60 pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
61 pub(super) hummock_version_stats: HummockVersionStats,
62 pub(crate) in_flight_barrier_nums: usize,
64}
65
66impl CheckpointControl {
67 pub fn new(env: MetaSrvEnv) -> Self {
68 Self {
69 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
70 env,
71 databases: Default::default(),
72 hummock_version_stats: Default::default(),
73 }
74 }
75
76 pub(crate) fn recover(
77 databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
78 failed_databases: HashSet<DatabaseId>,
79 control_stream_manager: &mut ControlStreamManager,
80 hummock_version_stats: HummockVersionStats,
81 env: MetaSrvEnv,
82 ) -> Self {
83 env.shared_actor_infos()
84 .retain_databases(databases.keys().chain(&failed_databases).cloned());
85 Self {
86 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
87 env,
88 databases: databases
89 .into_iter()
90 .map(|(database_id, control)| {
91 (
92 database_id,
93 DatabaseCheckpointControlStatus::Running(control),
94 )
95 })
96 .chain(failed_databases.into_iter().map(|database_id| {
97 (
98 database_id,
99 DatabaseCheckpointControlStatus::Recovering(
100 DatabaseRecoveringState::resetting(database_id, control_stream_manager),
101 ),
102 )
103 }))
104 .collect(),
105 hummock_version_stats,
106 }
107 }
108
109 pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
110 self.hummock_version_stats = output.hummock_version_stats;
111 for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
112 self.databases
113 .get_mut(&database_id)
114 .expect("should exist")
115 .expect_running("should have wait for completing command before enter recovery")
116 .ack_completed(command_prev_epoch, creating_job_epochs);
117 }
118 }
119
120 pub(crate) fn next_complete_barrier_task(
121 &mut self,
122 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
123 ) -> Option<CompleteBarrierTask> {
124 let mut task = None;
125 for database in self.databases.values_mut() {
126 let Some(database) = database.running_state_mut() else {
127 continue;
128 };
129 let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
130 database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
131 }
132 task
133 }
134
135 pub(crate) fn barrier_collected(
136 &mut self,
137 resp: BarrierCompleteResponse,
138 periodic_barriers: &mut PeriodicBarriers,
139 ) -> MetaResult<()> {
140 let database_id = resp.database_id;
141 let database_status = self.databases.get_mut(&database_id).expect("should exist");
142 match database_status {
143 DatabaseCheckpointControlStatus::Running(database) => {
144 database.barrier_collected(resp, periodic_barriers)
145 }
146 DatabaseCheckpointControlStatus::Recovering(state) => {
147 state.barrier_collected(database_id, resp);
148 Ok(())
149 }
150 }
151 }
152
153 pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
154 self.databases.iter().filter_map(|(database_id, database)| {
155 database.running_state().is_none().then_some(*database_id)
156 })
157 }
158
159 pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
160 self.databases.iter().filter_map(|(database_id, database)| {
161 database.running_state().is_some().then_some(*database_id)
162 })
163 }
164
165 pub(crate) fn handle_new_barrier(
167 &mut self,
168 new_barrier: NewBarrier,
169 control_stream_manager: &mut ControlStreamManager,
170 ) -> MetaResult<()> {
171 let NewBarrier {
172 database_id,
173 command,
174 span,
175 checkpoint,
176 } = new_barrier;
177
178 if let Some((mut command, notifiers)) = command {
179 if let &mut Command::CreateStreamingJob {
180 ref mut cross_db_snapshot_backfill_info,
181 ref info,
182 ..
183 } = &mut command
184 {
185 for (table_id, snapshot_epoch) in
186 &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
187 {
188 for database in self.databases.values() {
189 if let Some(database) = database.running_state()
190 && database
191 .state
192 .database_info
193 .contains_job(table_id.as_job_id())
194 {
195 if let Some(committed_epoch) = database.committed_epoch {
196 *snapshot_epoch = Some(committed_epoch);
197 }
198 break;
199 }
200 }
201 if snapshot_epoch.is_none() {
202 let table_id = *table_id;
203 warn!(
204 ?cross_db_snapshot_backfill_info,
205 ?table_id,
206 ?info,
207 "database of cross db upstream table not found"
208 );
209 let err: MetaError =
210 anyhow!("database of cross db upstream table {} not found", table_id)
211 .into();
212 for notifier in notifiers {
213 notifier.notify_start_failed(err.clone());
214 }
215
216 return Ok(());
217 }
218 }
219 }
220
221 let database = match self.databases.entry(database_id) {
222 Entry::Occupied(entry) => entry
223 .into_mut()
224 .expect_running("should not have command when not running"),
225 Entry::Vacant(entry) => match &command {
226 Command::CreateStreamingJob {
227 job_type: CreateStreamingJobType::Normal,
228 ..
229 } => {
230 let new_database = DatabaseCheckpointControl::new(
231 database_id,
232 self.env.shared_actor_infos().clone(),
233 self.env.cdc_table_backfill_tracker(),
234 );
235 control_stream_manager.add_partial_graph(database_id, None);
236 entry
237 .insert(DatabaseCheckpointControlStatus::Running(new_database))
238 .expect_running("just initialized as running")
239 }
240 Command::Flush | Command::Pause | Command::Resume => {
241 for mut notifier in notifiers {
242 notifier.notify_started();
243 notifier.notify_collected();
244 }
245 warn!(?command, "skip command for empty database");
246 return Ok(());
247 }
248 _ => {
249 panic!(
250 "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
251 database_id, command
252 )
253 }
254 },
255 };
256
257 database.handle_new_barrier(
258 Some((command, notifiers)),
259 checkpoint,
260 span,
261 control_stream_manager,
262 &self.hummock_version_stats,
263 )
264 } else {
265 let database = match self.databases.entry(database_id) {
266 Entry::Occupied(entry) => entry.into_mut(),
267 Entry::Vacant(_) => {
268 return Ok(());
271 }
272 };
273 let Some(database) = database.running_state_mut() else {
274 return Ok(());
276 };
277 if !database.can_inject_barrier(self.in_flight_barrier_nums) {
278 return Ok(());
280 }
281 database.handle_new_barrier(
282 None,
283 checkpoint,
284 span,
285 control_stream_manager,
286 &self.hummock_version_stats,
287 )
288 }
289 }
290
291 pub(crate) fn update_barrier_nums_metrics(&self) {
292 self.databases
293 .values()
294 .flat_map(|database| database.running_state())
295 .for_each(|database| database.update_barrier_nums_metrics());
296 }
297
298 pub(crate) fn gen_ddl_progress(&self) -> HashMap<JobId, DdlProgress> {
299 let mut progress = HashMap::new();
300 for status in self.databases.values() {
301 let Some(database_checkpoint_control) = status.running_state() else {
302 continue;
303 };
304 progress.extend(
306 database_checkpoint_control
307 .state
308 .database_info
309 .gen_ddl_progress(),
310 );
311 for creating_job in database_checkpoint_control
313 .creating_streaming_job_controls
314 .values()
315 {
316 progress.extend([(creating_job.job_id, creating_job.gen_ddl_progress())]);
317 }
318 }
319 progress
320 }
321
322 pub(crate) fn databases_failed_at_worker_err(
323 &mut self,
324 worker_id: WorkerId,
325 ) -> Vec<DatabaseId> {
326 let mut failed_databases = Vec::new();
327 for (database_id, database_status) in &mut self.databases {
328 let database_checkpoint_control = match database_status {
329 DatabaseCheckpointControlStatus::Running(control) => control,
330 DatabaseCheckpointControlStatus::Recovering(state) => {
331 if !state.is_valid_after_worker_err(worker_id) {
332 failed_databases.push(*database_id);
333 }
334 continue;
335 }
336 };
337
338 if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
339 || database_checkpoint_control
340 .state
341 .database_info
342 .contains_worker(worker_id as _)
343 || database_checkpoint_control
344 .creating_streaming_job_controls
345 .values_mut()
346 .any(|job| !job.is_valid_after_worker_err(worker_id))
347 {
348 failed_databases.push(*database_id);
349 }
350 }
351 failed_databases
352 }
353
354 pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
355 for (_, node) in self.databases.values_mut().flat_map(|status| {
356 status
357 .running_state_mut()
358 .map(|database| take(&mut database.command_ctx_queue))
359 .into_iter()
360 .flatten()
361 }) {
362 for notifier in node.notifiers {
363 notifier.notify_collection_failed(err.clone());
364 }
365 node.enqueue_time.observe_duration();
366 }
367 }
368
369 pub(crate) fn inflight_infos(
370 &self,
371 ) -> impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId> + '_)> + '_ {
372 self.databases.iter().flat_map(|(database_id, database)| {
373 database.database_state().map(|(_, creating_jobs)| {
374 (
375 *database_id,
376 creating_jobs
377 .values()
378 .filter_map(|job| job.is_consuming().then_some(job.job_id)),
379 )
380 })
381 })
382 }
383}
384
385pub(crate) enum CheckpointControlEvent<'a> {
386 EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
387 EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
388}
389
390impl CheckpointControl {
391 pub(crate) fn on_reset_database_resp(
392 &mut self,
393 worker_id: WorkerId,
394 resp: ResetDatabaseResponse,
395 ) {
396 let database_id = resp.database_id;
397 match self.databases.get_mut(&database_id).expect("should exist") {
398 DatabaseCheckpointControlStatus::Running(_) => {
399 unreachable!("should not receive reset database resp when running")
400 }
401 DatabaseCheckpointControlStatus::Recovering(state) => {
402 state.on_reset_database_resp(worker_id, resp)
403 }
404 }
405 }
406
407 pub(crate) fn next_event(
408 &mut self,
409 ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
410 let mut this = Some(self);
411 poll_fn(move |cx| {
412 let Some(this_mut) = this.as_mut() else {
413 unreachable!("should not be polled after poll ready")
414 };
415 for (&database_id, database_status) in &mut this_mut.databases {
416 match database_status {
417 DatabaseCheckpointControlStatus::Running(_) => {}
418 DatabaseCheckpointControlStatus::Recovering(state) => {
419 let poll_result = state.poll_next_event(cx);
420 if let Poll::Ready(action) = poll_result {
421 let this = this.take().expect("checked Some");
422 return Poll::Ready(match action {
423 RecoveringStateAction::EnterInitializing(reset_workers) => {
424 CheckpointControlEvent::EnteringInitializing(
425 this.new_database_status_action(
426 database_id,
427 EnterInitializing(reset_workers),
428 ),
429 )
430 }
431 RecoveringStateAction::EnterRunning => {
432 CheckpointControlEvent::EnteringRunning(
433 this.new_database_status_action(database_id, EnterRunning),
434 )
435 }
436 });
437 }
438 }
439 }
440 }
441 Poll::Pending
442 })
443 }
444}
445
446pub(crate) enum DatabaseCheckpointControlStatus {
447 Running(DatabaseCheckpointControl),
448 Recovering(DatabaseRecoveringState),
449}
450
451impl DatabaseCheckpointControlStatus {
452 fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
453 match self {
454 DatabaseCheckpointControlStatus::Running(state) => Some(state),
455 DatabaseCheckpointControlStatus::Recovering(_) => None,
456 }
457 }
458
459 fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
460 match self {
461 DatabaseCheckpointControlStatus::Running(state) => Some(state),
462 DatabaseCheckpointControlStatus::Recovering(_) => None,
463 }
464 }
465
466 fn database_state(
467 &self,
468 ) -> Option<(
469 &BarrierWorkerState,
470 &HashMap<JobId, CreatingStreamingJobControl>,
471 )> {
472 match self {
473 DatabaseCheckpointControlStatus::Running(control) => {
474 Some((&control.state, &control.creating_streaming_job_controls))
475 }
476 DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
477 }
478 }
479
480 fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
481 match self {
482 DatabaseCheckpointControlStatus::Running(state) => state,
483 DatabaseCheckpointControlStatus::Recovering(_) => {
484 panic!("should be at running: {}", reason)
485 }
486 }
487 }
488}
489
490struct DatabaseCheckpointControlMetrics {
491 barrier_latency: LabelGuardedHistogram,
492 in_flight_barrier_nums: LabelGuardedIntGauge,
493 all_barrier_nums: LabelGuardedIntGauge,
494}
495
496impl DatabaseCheckpointControlMetrics {
497 fn new(database_id: DatabaseId) -> Self {
498 let database_id_str = database_id.to_string();
499 let barrier_latency = GLOBAL_META_METRICS
500 .barrier_latency
501 .with_guarded_label_values(&[&database_id_str]);
502 let in_flight_barrier_nums = GLOBAL_META_METRICS
503 .in_flight_barrier_nums
504 .with_guarded_label_values(&[&database_id_str]);
505 let all_barrier_nums = GLOBAL_META_METRICS
506 .all_barrier_nums
507 .with_guarded_label_values(&[&database_id_str]);
508 Self {
509 barrier_latency,
510 in_flight_barrier_nums,
511 all_barrier_nums,
512 }
513 }
514}
515
516pub(crate) struct DatabaseCheckpointControl {
518 database_id: DatabaseId,
519 state: BarrierWorkerState,
520
521 command_ctx_queue: BTreeMap<u64, EpochNode>,
524 completing_barrier: Option<u64>,
527
528 committed_epoch: Option<u64>,
529 creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
530
531 cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
532
533 metrics: DatabaseCheckpointControlMetrics,
534}
535
536impl DatabaseCheckpointControl {
537 fn new(
538 database_id: DatabaseId,
539 shared_actor_infos: SharedActorInfos,
540 cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
541 ) -> Self {
542 Self {
543 database_id,
544 state: BarrierWorkerState::new(database_id, shared_actor_infos),
545 command_ctx_queue: Default::default(),
546 completing_barrier: None,
547 committed_epoch: None,
548 creating_streaming_job_controls: Default::default(),
549 cdc_table_backfill_tracker,
550 metrics: DatabaseCheckpointControlMetrics::new(database_id),
551 }
552 }
553
554 pub(crate) fn recovery(
555 database_id: DatabaseId,
556 state: BarrierWorkerState,
557 committed_epoch: u64,
558 creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
559 cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
560 ) -> Self {
561 Self {
562 database_id,
563 state,
564 command_ctx_queue: Default::default(),
565 completing_barrier: None,
566 committed_epoch: Some(committed_epoch),
567 creating_streaming_job_controls,
568 cdc_table_backfill_tracker,
569 metrics: DatabaseCheckpointControlMetrics::new(database_id),
570 }
571 }
572
573 fn total_command_num(&self) -> usize {
574 self.command_ctx_queue.len()
575 + match &self.completing_barrier {
576 Some(_) => 1,
577 None => 0,
578 }
579 }
580
581 fn update_barrier_nums_metrics(&self) {
583 self.metrics.in_flight_barrier_nums.set(
584 self.command_ctx_queue
585 .values()
586 .filter(|x| x.state.is_inflight())
587 .count() as i64,
588 );
589 self.metrics
590 .all_barrier_nums
591 .set(self.total_command_num() as i64);
592 }
593
594 fn jobs_to_merge(&self) -> Option<HashMap<JobId, HashSet<TableId>>> {
595 let mut job_ids_to_merge = HashMap::new();
596
597 for (job_id, creating_streaming_job) in &self.creating_streaming_job_controls {
598 if let Some(upstream_table_ids) = creating_streaming_job.should_merge_to_upstream() {
599 job_ids_to_merge.insert(*job_id, upstream_table_ids.clone());
600 }
601 }
602 if job_ids_to_merge.is_empty() {
603 None
604 } else {
605 Some(job_ids_to_merge)
606 }
607 }
608
609 fn enqueue_command(
611 &mut self,
612 command_ctx: CommandContext,
613 notifiers: Vec<Notifier>,
614 node_to_collect: NodeToCollect,
615 creating_jobs_to_wait: HashSet<JobId>,
616 ) {
617 let timer = self.metrics.barrier_latency.start_timer();
618
619 if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
620 assert_eq!(
621 command_ctx.barrier_info.prev_epoch.value(),
622 node.command_ctx.barrier_info.curr_epoch.value()
623 );
624 }
625
626 tracing::trace!(
627 prev_epoch = command_ctx.barrier_info.prev_epoch(),
628 ?creating_jobs_to_wait,
629 ?node_to_collect,
630 "enqueue command"
631 );
632 self.command_ctx_queue.insert(
633 command_ctx.barrier_info.prev_epoch(),
634 EpochNode {
635 enqueue_time: timer,
636 state: BarrierEpochState {
637 node_to_collect,
638 resps: vec![],
639 creating_jobs_to_wait,
640 finished_jobs: HashMap::new(),
641 },
642 command_ctx,
643 notifiers,
644 },
645 );
646 }
647
648 fn barrier_collected(
651 &mut self,
652 resp: BarrierCompleteResponse,
653 periodic_barriers: &mut PeriodicBarriers,
654 ) -> MetaResult<()> {
655 let worker_id = resp.worker_id;
656 let prev_epoch = resp.epoch;
657 tracing::trace!(
658 %worker_id,
659 prev_epoch,
660 partial_graph_id = resp.partial_graph_id,
661 "barrier collected"
662 );
663 let creating_job_id = from_partial_graph_id(resp.partial_graph_id);
664 match creating_job_id {
665 None => {
666 if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
667 assert!(node.state.node_to_collect.remove(&worker_id).is_some());
668 node.state.resps.push(resp);
669 } else {
670 panic!(
671 "collect barrier on non-existing barrier: {}, {}",
672 prev_epoch, worker_id
673 );
674 }
675 }
676 Some(creating_job_id) => {
677 let should_merge_to_upstream = self
678 .creating_streaming_job_controls
679 .get_mut(&creating_job_id)
680 .expect("should exist")
681 .collect(resp);
682 if should_merge_to_upstream {
683 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
684 }
685 }
686 }
687 Ok(())
688 }
689
690 fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
692 self.command_ctx_queue
693 .values()
694 .filter(|x| x.state.is_inflight())
695 .count()
696 < in_flight_barrier_nums
697 }
698
699 pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
701 for epoch_node in self.command_ctx_queue.values_mut() {
702 if !is_valid_after_worker_err(&mut epoch_node.state.node_to_collect, worker_id) {
703 return false;
704 }
705 }
706 true
708 }
709}
710
711impl DatabaseCheckpointControl {
712 fn collect_backfill_pinned_upstream_log_epoch(
714 &self,
715 ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
716 self.creating_streaming_job_controls
717 .iter()
718 .filter_map(|(job_id, creating_job)| {
719 creating_job
720 .pinned_upstream_log_epoch()
721 .map(|progress_epoch| {
722 (
723 *job_id,
724 (
725 progress_epoch,
726 creating_job.snapshot_backfill_upstream_tables.clone(),
727 ),
728 )
729 })
730 })
731 .collect()
732 }
733
734 fn next_complete_barrier_task(
735 &mut self,
736 task: &mut Option<CompleteBarrierTask>,
737 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
738 hummock_version_stats: &HummockVersionStats,
739 ) {
740 let mut creating_jobs_task = vec![];
742 {
743 let mut finished_jobs = Vec::new();
745 let min_upstream_inflight_barrier = self
746 .command_ctx_queue
747 .first_key_value()
748 .map(|(epoch, _)| *epoch);
749 for (job_id, job) in &mut self.creating_streaming_job_controls {
750 if let Some((epoch, resps, status)) =
751 job.start_completing(min_upstream_inflight_barrier)
752 {
753 let is_first_time = match status {
754 CompleteJobType::First => true,
755 CompleteJobType::Normal => false,
756 CompleteJobType::Finished => {
757 finished_jobs.push((*job_id, epoch, resps));
758 continue;
759 }
760 };
761 creating_jobs_task.push((*job_id, epoch, resps, is_first_time));
762 }
763 }
764 if !finished_jobs.is_empty()
765 && let Some((_, control_stream_manager)) = &mut context
766 {
767 control_stream_manager.remove_partial_graph(
768 self.database_id,
769 finished_jobs.iter().map(|(job_id, _, _)| *job_id).collect(),
770 );
771 }
772 for (job_id, epoch, resps) in finished_jobs {
773 let epoch_state = &mut self
774 .command_ctx_queue
775 .get_mut(&epoch)
776 .expect("should exist")
777 .state;
778 assert!(epoch_state.creating_jobs_to_wait.remove(&job_id));
779 debug!(epoch, %job_id, "finish creating job");
780 let creating_streaming_job = self
783 .creating_streaming_job_controls
784 .remove(&job_id)
785 .expect("should exist");
786 let tracking_job = creating_streaming_job.into_tracking_job();
787
788 assert!(
789 epoch_state
790 .finished_jobs
791 .insert(job_id, (resps, tracking_job))
792 .is_none()
793 );
794 }
795 }
796 assert!(self.completing_barrier.is_none());
797 while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
798 && !state.is_inflight()
799 {
800 {
801 let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
802 assert!(node.state.creating_jobs_to_wait.is_empty());
803 assert!(node.state.node_to_collect.is_empty());
804
805 self.handle_refresh_table_info(task, &node);
806
807 self.state.database_info.apply_collected_command(
808 node.command_ctx.command.as_ref(),
809 node.state.resps.iter(),
810 hummock_version_stats,
811 );
812 let finished_cdc_backfill = self
813 .cdc_table_backfill_tracker
814 .apply_collected_command(&node.state.resps);
815 if !node.command_ctx.barrier_info.kind.is_checkpoint() {
816 node.notifiers.into_iter().for_each(|notifier| {
817 notifier.notify_collected();
818 });
819 if let Some((periodic_barriers, _)) = &mut context
820 && self.state.database_info.has_pending_finished_jobs()
821 && self
822 .command_ctx_queue
823 .values()
824 .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
825 {
826 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
827 }
828 continue;
829 }
830 let mut staging_commit_info = self.state.database_info.take_staging_commit_info();
831 node.state
832 .finished_jobs
833 .drain()
834 .for_each(|(_job_id, (resps, tracking_job))| {
835 node.state.resps.extend(resps);
836 staging_commit_info.finished_jobs.push(tracking_job);
837 });
838 let task = task.get_or_insert_default();
839 node.command_ctx.collect_commit_epoch_info(
840 &mut task.commit_info,
841 take(&mut node.state.resps),
842 self.collect_backfill_pinned_upstream_log_epoch(),
843 );
844 self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
845 task.finished_jobs.extend(staging_commit_info.finished_jobs);
846 task.finished_cdc_table_backfill
847 .extend(finished_cdc_backfill);
848 task.notifiers.extend(node.notifiers);
849 task.epoch_infos
850 .try_insert(
851 self.database_id,
852 (Some((node.command_ctx, node.enqueue_time)), vec![]),
853 )
854 .expect("non duplicate");
855 task.commit_info
856 .truncate_tables
857 .extend(staging_commit_info.table_ids_to_truncate);
858 break;
859 }
860 }
861 if !creating_jobs_task.is_empty() {
862 let task = task.get_or_insert_default();
863 for (job_id, epoch, resps, is_first_time) in creating_jobs_task {
864 collect_creating_job_commit_epoch_info(
865 &mut task.commit_info,
866 epoch,
867 resps,
868 self.creating_streaming_job_controls[&job_id]
869 .state_table_ids()
870 .iter()
871 .copied(),
872 is_first_time,
873 );
874 let (_, creating_job_epochs) =
875 task.epoch_infos.entry(self.database_id).or_default();
876 creating_job_epochs.push((job_id, epoch));
877 }
878 }
879 }
880
881 fn ack_completed(
882 &mut self,
883 command_prev_epoch: Option<u64>,
884 creating_job_epochs: Vec<(JobId, u64)>,
885 ) {
886 {
887 if let Some(prev_epoch) = self.completing_barrier.take() {
888 assert_eq!(command_prev_epoch, Some(prev_epoch));
889 self.committed_epoch = Some(prev_epoch);
890 } else {
891 assert_eq!(command_prev_epoch, None);
892 };
893 for (job_id, epoch) in creating_job_epochs {
894 self.creating_streaming_job_controls
895 .get_mut(&job_id)
896 .expect("should exist")
897 .ack_completed(epoch)
898 }
899 }
900 }
901
902 fn handle_refresh_table_info(&self, task: &mut Option<CompleteBarrierTask>, node: &EpochNode) {
903 let list_finished_info = node
904 .state
905 .resps
906 .iter()
907 .flat_map(|resp| resp.list_finished_sources.clone())
908 .collect::<Vec<_>>();
909 if !list_finished_info.is_empty() {
910 let task = task.get_or_insert_default();
911 task.list_finished_source_ids.extend(list_finished_info);
912 }
913
914 let load_finished_info = node
915 .state
916 .resps
917 .iter()
918 .flat_map(|resp| resp.load_finished_sources.clone())
919 .collect::<Vec<_>>();
920 if !load_finished_info.is_empty() {
921 let task = task.get_or_insert_default();
922 task.load_finished_source_ids.extend(load_finished_info);
923 }
924
925 let refresh_finished_table_ids: Vec<JobId> = node
926 .state
927 .resps
928 .iter()
929 .flat_map(|resp| {
930 resp.refresh_finished_tables
931 .iter()
932 .map(|table_id| table_id.as_job_id())
933 })
934 .collect::<Vec<_>>();
935 if !refresh_finished_table_ids.is_empty() {
936 let task = task.get_or_insert_default();
937 task.refresh_finished_table_job_ids
938 .extend(refresh_finished_table_ids);
939 }
940 }
941}
942
943struct EpochNode {
945 enqueue_time: HistogramTimer,
947
948 state: BarrierEpochState,
950
951 command_ctx: CommandContext,
953 notifiers: Vec<Notifier>,
955}
956
957#[derive(Debug)]
958struct BarrierEpochState {
960 node_to_collect: NodeToCollect,
961
962 resps: Vec<BarrierCompleteResponse>,
963
964 creating_jobs_to_wait: HashSet<JobId>,
965
966 finished_jobs: HashMap<JobId, (Vec<BarrierCompleteResponse>, TrackingJob)>,
967}
968
969impl BarrierEpochState {
970 fn is_inflight(&self) -> bool {
971 !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
972 }
973}
974
975impl DatabaseCheckpointControl {
976 fn handle_new_barrier(
978 &mut self,
979 command: Option<(Command, Vec<Notifier>)>,
980 checkpoint: bool,
981 span: tracing::Span,
982 control_stream_manager: &mut ControlStreamManager,
983 hummock_version_stats: &HummockVersionStats,
984 ) -> MetaResult<()> {
985 let curr_epoch = self.state.in_flight_prev_epoch().next();
986
987 let (mut command, mut notifiers) = if let Some((command, notifiers)) = command {
988 (Some(command), notifiers)
989 } else {
990 (None, vec![])
991 };
992
993 for job_to_cancel in command
994 .as_ref()
995 .map(Command::jobs_to_drop)
996 .into_iter()
997 .flatten()
998 {
999 if self
1000 .creating_streaming_job_controls
1001 .contains_key(&job_to_cancel)
1002 {
1003 warn!(
1004 job_id = %job_to_cancel,
1005 "ignore cancel command on creating streaming job"
1006 );
1007 for notifier in notifiers {
1008 notifier
1009 .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
1010 }
1011 return Ok(());
1012 }
1013 }
1014
1015 if let Some(Command::RescheduleFragment { .. }) = &command
1016 && !self.creating_streaming_job_controls.is_empty()
1017 {
1018 warn!("ignore reschedule when creating streaming job with snapshot backfill");
1019 for notifier in notifiers {
1020 notifier.notify_start_failed(
1021 anyhow!(
1022 "cannot reschedule when creating streaming job with snapshot backfill",
1023 )
1024 .into(),
1025 );
1026 }
1027 return Ok(());
1028 }
1029
1030 let Some(barrier_info) =
1031 self.state
1032 .next_barrier_info(command.as_ref(), checkpoint, curr_epoch)
1033 else {
1034 for mut notifier in notifiers {
1036 notifier.notify_started();
1037 notifier.notify_collected();
1038 }
1039 return Ok(());
1040 };
1041
1042 let mut edges = self
1043 .state
1044 .database_info
1045 .build_edge(command.as_ref(), &*control_stream_manager);
1046
1047 if let Some(Command::CreateStreamingJob {
1049 job_type,
1050 info,
1051 cross_db_snapshot_backfill_info,
1052 }) = &mut command
1053 {
1054 match job_type {
1055 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
1056 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1057 fill_snapshot_backfill_epoch(
1058 &mut fragment.nodes,
1059 None,
1060 cross_db_snapshot_backfill_info,
1061 )?;
1062 }
1063 }
1064 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
1065 if self.state.is_paused() {
1066 warn!("cannot create streaming job with snapshot backfill when paused");
1067 for notifier in notifiers {
1068 notifier.notify_start_failed(
1069 anyhow!("cannot create streaming job with snapshot backfill when paused",)
1070 .into(),
1071 );
1072 }
1073 return Ok(());
1074 }
1075 for snapshot_backfill_epoch in snapshot_backfill_info
1077 .upstream_mv_table_id_to_backfill_epoch
1078 .values_mut()
1079 {
1080 assert_eq!(
1081 snapshot_backfill_epoch.replace(barrier_info.prev_epoch()),
1082 None,
1083 "must not set previously"
1084 );
1085 }
1086 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1087 if let Err(e) = fill_snapshot_backfill_epoch(
1088 &mut fragment.nodes,
1089 Some(snapshot_backfill_info),
1090 cross_db_snapshot_backfill_info,
1091 ) {
1092 warn!(e = %e.as_report(), "failed to fill snapshot backfill epoch");
1093 for notifier in notifiers {
1094 notifier.notify_start_failed(e.clone());
1095 }
1096 return Ok(());
1097 };
1098 }
1099 let job_id = info.stream_job_fragments.stream_job_id();
1100 let snapshot_backfill_upstream_tables = snapshot_backfill_info
1101 .upstream_mv_table_id_to_backfill_epoch
1102 .keys()
1103 .cloned()
1104 .collect();
1105
1106 let job = CreatingStreamingJobControl::new(
1107 info,
1108 snapshot_backfill_upstream_tables,
1109 barrier_info.prev_epoch(),
1110 hummock_version_stats,
1111 control_stream_manager,
1112 edges.as_mut().expect("should exist"),
1113 )?;
1114
1115 self.state
1116 .database_info
1117 .shared_actor_infos
1118 .upsert(self.database_id, job.fragment_infos_with_job_id());
1119
1120 self.creating_streaming_job_controls.insert(job_id, job);
1121 }
1122 }
1123 }
1124
1125 if let (BarrierKind::Checkpoint(_), None) = (&barrier_info.kind, &command) {
1127 if let Some(jobs_to_merge) = self.jobs_to_merge() {
1128 command = Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge));
1129 } else {
1130 let pending_backfill_nodes = self.state.database_info.take_pending_backfill_nodes();
1131 if !pending_backfill_nodes.is_empty() {
1132 command = Some(Command::StartFragmentBackfill {
1133 fragment_ids: pending_backfill_nodes,
1134 });
1135 }
1136 }
1137 }
1138
1139 let mut finished_snapshot_backfill_job_fragments = HashMap::new();
1140 for (job_id, creating_job) in &mut self.creating_streaming_job_controls {
1141 if let Some(finished_job_fragments) = creating_job.on_new_command(
1142 control_stream_manager,
1143 command.as_ref(),
1144 &barrier_info,
1145 )? {
1146 finished_snapshot_backfill_job_fragments.insert(*job_id, finished_job_fragments);
1147 }
1148 }
1149
1150 let command = command;
1151
1152 let ApplyCommandInfo {
1153 node_actors,
1154 actors_to_create,
1155 mv_subscription_max_retention,
1156 table_ids_to_commit,
1157 table_ids_to_sync,
1158 jobs_to_wait,
1159 mutation,
1160 } = self.state.apply_command(
1161 command.as_ref(),
1162 finished_snapshot_backfill_job_fragments,
1163 &mut edges,
1164 control_stream_manager,
1165 );
1166
1167 barrier_info.prev_epoch.span().in_scope(|| {
1169 tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch.value().0, "new barrier enqueued");
1170 });
1171 span.record("epoch", barrier_info.curr_epoch.value().0);
1172 let node_to_collect = match control_stream_manager.inject_barrier(
1173 self.database_id,
1174 None,
1175 mutation,
1176 &barrier_info,
1177 &node_actors,
1178 table_ids_to_sync.iter().copied(),
1179 actors_to_create,
1180 ) {
1181 Ok(node_to_collect) => node_to_collect,
1182 Err(err) => {
1183 for notifier in notifiers {
1184 notifier.notify_start_failed(err.clone());
1185 }
1186 fail_point!("inject_barrier_err_success");
1187 return Err(err);
1188 }
1189 };
1190
1191 notifiers.iter_mut().for_each(|n| n.notify_started());
1193
1194 let command_ctx = CommandContext::new(
1195 barrier_info,
1196 mv_subscription_max_retention,
1197 table_ids_to_commit,
1198 command,
1199 span,
1200 );
1201
1202 self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1204
1205 Ok(())
1206 }
1207}