1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_meta_model::{SourceId, WorkerId};
28use risingwave_pb::ddl_service::DdlProgress;
29use risingwave_pb::hummock::HummockVersionStats;
30use risingwave_pb::stream_service::BarrierCompleteResponse;
31use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
32use thiserror_ext::AsReport;
33use tracing::{debug, warn};
34
35use crate::barrier::cdc_progress::CdcTableBackfillTrackerRef;
36use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
37use crate::barrier::checkpoint::recovery::{
38 DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
39 RecoveringStateAction,
40};
41use crate::barrier::checkpoint::state::BarrierWorkerState;
42use crate::barrier::command::CommandContext;
43use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
44use crate::barrier::info::{InflightStreamingJobInfo, SharedActorInfos};
45use crate::barrier::notifier::Notifier;
46use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
47use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
48use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
49use crate::barrier::utils::{
50 NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
51};
52use crate::barrier::{BarrierKind, Command, CreateStreamingJobType};
53use crate::manager::MetaSrvEnv;
54use crate::rpc::metrics::GLOBAL_META_METRICS;
55use crate::stream::{SourceChange, fill_snapshot_backfill_epoch};
56use crate::{MetaError, MetaResult};
57
58pub(crate) struct CheckpointControl {
59 pub(crate) env: MetaSrvEnv,
60 pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
61 pub(super) hummock_version_stats: HummockVersionStats,
62 pub(crate) in_flight_barrier_nums: usize,
64}
65
66impl CheckpointControl {
67 pub fn new(env: MetaSrvEnv) -> Self {
68 Self {
69 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
70 env,
71 databases: Default::default(),
72 hummock_version_stats: Default::default(),
73 }
74 }
75
76 pub(crate) fn recover(
77 databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
78 failed_databases: HashSet<DatabaseId>,
79 control_stream_manager: &mut ControlStreamManager,
80 hummock_version_stats: HummockVersionStats,
81 env: MetaSrvEnv,
82 ) -> Self {
83 env.shared_actor_infos()
84 .retain_databases(databases.keys().chain(&failed_databases).cloned());
85 Self {
86 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
87 env,
88 databases: databases
89 .into_iter()
90 .map(|(database_id, control)| {
91 (
92 database_id,
93 DatabaseCheckpointControlStatus::Running(control),
94 )
95 })
96 .chain(failed_databases.into_iter().map(|database_id| {
97 (
98 database_id,
99 DatabaseCheckpointControlStatus::Recovering(
100 DatabaseRecoveringState::resetting(database_id, control_stream_manager),
101 ),
102 )
103 }))
104 .collect(),
105 hummock_version_stats,
106 }
107 }
108
109 pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
110 self.hummock_version_stats = output.hummock_version_stats;
111 for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
112 self.databases
113 .get_mut(&database_id)
114 .expect("should exist")
115 .expect_running("should have wait for completing command before enter recovery")
116 .ack_completed(command_prev_epoch, creating_job_epochs);
117 }
118 }
119
120 pub(crate) fn next_complete_barrier_task(
121 &mut self,
122 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
123 ) -> Option<CompleteBarrierTask> {
124 let mut task = None;
125 for database in self.databases.values_mut() {
126 let Some(database) = database.running_state_mut() else {
127 continue;
128 };
129 let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
130 database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
131 }
132 task
133 }
134
135 pub(crate) fn barrier_collected(
136 &mut self,
137 resp: BarrierCompleteResponse,
138 periodic_barriers: &mut PeriodicBarriers,
139 ) -> MetaResult<()> {
140 let database_id = resp.database_id;
141 let database_status = self.databases.get_mut(&database_id).expect("should exist");
142 match database_status {
143 DatabaseCheckpointControlStatus::Running(database) => {
144 database.barrier_collected(resp, periodic_barriers)
145 }
146 DatabaseCheckpointControlStatus::Recovering(state) => {
147 state.barrier_collected(database_id, resp);
148 Ok(())
149 }
150 }
151 }
152
153 pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
154 self.databases.iter().filter_map(|(database_id, database)| {
155 database.running_state().is_none().then_some(*database_id)
156 })
157 }
158
159 pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
160 self.databases.iter().filter_map(|(database_id, database)| {
161 database.running_state().is_some().then_some(*database_id)
162 })
163 }
164
165 pub(crate) fn handle_new_barrier(
167 &mut self,
168 new_barrier: NewBarrier,
169 control_stream_manager: &mut ControlStreamManager,
170 ) -> MetaResult<()> {
171 let NewBarrier {
172 database_id,
173 command,
174 span,
175 checkpoint,
176 } = new_barrier;
177
178 if let Some((mut command, notifiers)) = command {
179 if let &mut Command::CreateStreamingJob {
180 ref mut cross_db_snapshot_backfill_info,
181 ref info,
182 ..
183 } = &mut command
184 {
185 for (table_id, snapshot_epoch) in
186 &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
187 {
188 for database in self.databases.values() {
189 if let Some(database) = database.running_state()
190 && database
191 .state
192 .inflight_graph_info
193 .contains_job(table_id.as_job_id())
194 {
195 if let Some(committed_epoch) = database.committed_epoch {
196 *snapshot_epoch = Some(committed_epoch);
197 }
198 break;
199 }
200 }
201 if snapshot_epoch.is_none() {
202 let table_id = *table_id;
203 warn!(
204 ?cross_db_snapshot_backfill_info,
205 ?table_id,
206 ?info,
207 "database of cross db upstream table not found"
208 );
209 let err: MetaError =
210 anyhow!("database of cross db upstream table {} not found", table_id)
211 .into();
212 for notifier in notifiers {
213 notifier.notify_start_failed(err.clone());
214 }
215
216 return Ok(());
217 }
218 }
219 }
220
221 let database = match self.databases.entry(database_id) {
222 Entry::Occupied(entry) => entry
223 .into_mut()
224 .expect_running("should not have command when not running"),
225 Entry::Vacant(entry) => match &command {
226 Command::CreateStreamingJob {
227 job_type: CreateStreamingJobType::Normal,
228 ..
229 } => {
230 let new_database = DatabaseCheckpointControl::new(
231 database_id,
232 self.env.shared_actor_infos().clone(),
233 self.env.cdc_table_backfill_tracker(),
234 );
235 control_stream_manager.add_partial_graph(database_id, None);
236 entry
237 .insert(DatabaseCheckpointControlStatus::Running(new_database))
238 .expect_running("just initialized as running")
239 }
240 Command::Flush | Command::Pause | Command::Resume => {
241 for mut notifier in notifiers {
242 notifier.notify_started();
243 notifier.notify_collected();
244 }
245 warn!(?command, "skip command for empty database");
246 return Ok(());
247 }
248 _ => {
249 panic!(
250 "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
251 database_id, command
252 )
253 }
254 },
255 };
256
257 database.handle_new_barrier(
258 Some((command, notifiers)),
259 checkpoint,
260 span,
261 control_stream_manager,
262 &self.hummock_version_stats,
263 )
264 } else {
265 let database = match self.databases.entry(database_id) {
266 Entry::Occupied(entry) => entry.into_mut(),
267 Entry::Vacant(_) => {
268 return Ok(());
271 }
272 };
273 let Some(database) = database.running_state_mut() else {
274 return Ok(());
276 };
277 if !database.can_inject_barrier(self.in_flight_barrier_nums) {
278 return Ok(());
280 }
281 database.handle_new_barrier(
282 None,
283 checkpoint,
284 span,
285 control_stream_manager,
286 &self.hummock_version_stats,
287 )
288 }
289 }
290
291 pub(crate) fn update_barrier_nums_metrics(&self) {
292 self.databases
293 .values()
294 .flat_map(|database| database.running_state())
295 .for_each(|database| database.update_barrier_nums_metrics());
296 }
297
298 pub(crate) fn gen_ddl_progress(&self) -> HashMap<JobId, DdlProgress> {
299 let mut progress = HashMap::new();
300 for status in self.databases.values() {
301 let Some(database_checkpoint_control) = status.running_state() else {
302 continue;
303 };
304 progress.extend(
306 database_checkpoint_control
307 .create_mview_tracker
308 .gen_ddl_progress(),
309 );
310 for creating_job in database_checkpoint_control
312 .creating_streaming_job_controls
313 .values()
314 {
315 progress.extend([(creating_job.job_id, creating_job.gen_ddl_progress())]);
316 }
317 }
318 progress
319 }
320
321 pub(crate) fn databases_failed_at_worker_err(
322 &mut self,
323 worker_id: WorkerId,
324 ) -> Vec<DatabaseId> {
325 let mut failed_databases = Vec::new();
326 for (database_id, database_status) in &mut self.databases {
327 let database_checkpoint_control = match database_status {
328 DatabaseCheckpointControlStatus::Running(control) => control,
329 DatabaseCheckpointControlStatus::Recovering(state) => {
330 if !state.is_valid_after_worker_err(worker_id) {
331 failed_databases.push(*database_id);
332 }
333 continue;
334 }
335 };
336
337 if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
338 || database_checkpoint_control
339 .state
340 .inflight_graph_info
341 .contains_worker(worker_id as _)
342 || database_checkpoint_control
343 .creating_streaming_job_controls
344 .values_mut()
345 .any(|job| !job.is_valid_after_worker_err(worker_id))
346 {
347 failed_databases.push(*database_id);
348 }
349 }
350 failed_databases
351 }
352
353 pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
354 for (_, node) in self.databases.values_mut().flat_map(|status| {
355 status
356 .running_state_mut()
357 .map(|database| take(&mut database.command_ctx_queue))
358 .into_iter()
359 .flatten()
360 }) {
361 for notifier in node.notifiers {
362 notifier.notify_collection_failed(err.clone());
363 }
364 node.enqueue_time.observe_duration();
365 }
366 self.databases.values_mut().for_each(|database| {
367 if let Some(database) = database.running_state_mut() {
368 database.create_mview_tracker.abort_all()
369 }
370 });
371 }
372
373 pub(crate) fn inflight_infos(
374 &self,
375 ) -> impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId> + '_)> + '_ {
376 self.databases.iter().flat_map(|(database_id, database)| {
377 database.database_state().map(|(_, creating_jobs)| {
378 (
379 *database_id,
380 creating_jobs
381 .values()
382 .filter_map(|job| job.is_consuming().then_some(job.job_id)),
383 )
384 })
385 })
386 }
387}
388
389pub(crate) enum CheckpointControlEvent<'a> {
390 EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
391 EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
392}
393
394impl CheckpointControl {
395 pub(crate) fn on_reset_database_resp(
396 &mut self,
397 worker_id: WorkerId,
398 resp: ResetDatabaseResponse,
399 ) {
400 let database_id = resp.database_id;
401 match self.databases.get_mut(&database_id).expect("should exist") {
402 DatabaseCheckpointControlStatus::Running(_) => {
403 unreachable!("should not receive reset database resp when running")
404 }
405 DatabaseCheckpointControlStatus::Recovering(state) => {
406 state.on_reset_database_resp(worker_id, resp)
407 }
408 }
409 }
410
411 pub(crate) fn next_event(
412 &mut self,
413 ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
414 let mut this = Some(self);
415 poll_fn(move |cx| {
416 let Some(this_mut) = this.as_mut() else {
417 unreachable!("should not be polled after poll ready")
418 };
419 for (&database_id, database_status) in &mut this_mut.databases {
420 match database_status {
421 DatabaseCheckpointControlStatus::Running(_) => {}
422 DatabaseCheckpointControlStatus::Recovering(state) => {
423 let poll_result = state.poll_next_event(cx);
424 if let Poll::Ready(action) = poll_result {
425 let this = this.take().expect("checked Some");
426 return Poll::Ready(match action {
427 RecoveringStateAction::EnterInitializing(reset_workers) => {
428 CheckpointControlEvent::EnteringInitializing(
429 this.new_database_status_action(
430 database_id,
431 EnterInitializing(reset_workers),
432 ),
433 )
434 }
435 RecoveringStateAction::EnterRunning => {
436 CheckpointControlEvent::EnteringRunning(
437 this.new_database_status_action(database_id, EnterRunning),
438 )
439 }
440 });
441 }
442 }
443 }
444 }
445 Poll::Pending
446 })
447 }
448}
449
450pub(crate) enum DatabaseCheckpointControlStatus {
451 Running(DatabaseCheckpointControl),
452 Recovering(DatabaseRecoveringState),
453}
454
455impl DatabaseCheckpointControlStatus {
456 fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
457 match self {
458 DatabaseCheckpointControlStatus::Running(state) => Some(state),
459 DatabaseCheckpointControlStatus::Recovering(_) => None,
460 }
461 }
462
463 fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
464 match self {
465 DatabaseCheckpointControlStatus::Running(state) => Some(state),
466 DatabaseCheckpointControlStatus::Recovering(_) => None,
467 }
468 }
469
470 fn database_state(
471 &self,
472 ) -> Option<(
473 &BarrierWorkerState,
474 &HashMap<JobId, CreatingStreamingJobControl>,
475 )> {
476 match self {
477 DatabaseCheckpointControlStatus::Running(control) => {
478 Some((&control.state, &control.creating_streaming_job_controls))
479 }
480 DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
481 }
482 }
483
484 fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
485 match self {
486 DatabaseCheckpointControlStatus::Running(state) => state,
487 DatabaseCheckpointControlStatus::Recovering(_) => {
488 panic!("should be at running: {}", reason)
489 }
490 }
491 }
492}
493
494struct DatabaseCheckpointControlMetrics {
495 barrier_latency: LabelGuardedHistogram,
496 in_flight_barrier_nums: LabelGuardedIntGauge,
497 all_barrier_nums: LabelGuardedIntGauge,
498}
499
500impl DatabaseCheckpointControlMetrics {
501 fn new(database_id: DatabaseId) -> Self {
502 let database_id_str = database_id.to_string();
503 let barrier_latency = GLOBAL_META_METRICS
504 .barrier_latency
505 .with_guarded_label_values(&[&database_id_str]);
506 let in_flight_barrier_nums = GLOBAL_META_METRICS
507 .in_flight_barrier_nums
508 .with_guarded_label_values(&[&database_id_str]);
509 let all_barrier_nums = GLOBAL_META_METRICS
510 .all_barrier_nums
511 .with_guarded_label_values(&[&database_id_str]);
512 Self {
513 barrier_latency,
514 in_flight_barrier_nums,
515 all_barrier_nums,
516 }
517 }
518}
519
520pub(crate) struct DatabaseCheckpointControl {
522 database_id: DatabaseId,
523 state: BarrierWorkerState,
524
525 command_ctx_queue: BTreeMap<u64, EpochNode>,
528 completing_barrier: Option<u64>,
531
532 committed_epoch: Option<u64>,
533 creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
534
535 create_mview_tracker: CreateMviewProgressTracker,
536 cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
537
538 metrics: DatabaseCheckpointControlMetrics,
539}
540
541impl DatabaseCheckpointControl {
542 fn new(
543 database_id: DatabaseId,
544 shared_actor_infos: SharedActorInfos,
545 cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
546 ) -> Self {
547 Self {
548 database_id,
549 state: BarrierWorkerState::new(database_id, shared_actor_infos),
550 command_ctx_queue: Default::default(),
551 completing_barrier: None,
552 committed_epoch: None,
553 creating_streaming_job_controls: Default::default(),
554 create_mview_tracker: Default::default(),
555 cdc_table_backfill_tracker,
556 metrics: DatabaseCheckpointControlMetrics::new(database_id),
557 }
558 }
559
560 pub(crate) fn recovery(
561 database_id: DatabaseId,
562 create_mview_tracker: CreateMviewProgressTracker,
563 state: BarrierWorkerState,
564 committed_epoch: u64,
565 creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
566 cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
567 ) -> Self {
568 Self {
569 database_id,
570 state,
571 command_ctx_queue: Default::default(),
572 completing_barrier: None,
573 committed_epoch: Some(committed_epoch),
574 creating_streaming_job_controls,
575 create_mview_tracker,
576 cdc_table_backfill_tracker,
577 metrics: DatabaseCheckpointControlMetrics::new(database_id),
578 }
579 }
580
581 fn total_command_num(&self) -> usize {
582 self.command_ctx_queue.len()
583 + match &self.completing_barrier {
584 Some(_) => 1,
585 None => 0,
586 }
587 }
588
589 fn update_barrier_nums_metrics(&self) {
591 self.metrics.in_flight_barrier_nums.set(
592 self.command_ctx_queue
593 .values()
594 .filter(|x| x.state.is_inflight())
595 .count() as i64,
596 );
597 self.metrics
598 .all_barrier_nums
599 .set(self.total_command_num() as i64);
600 }
601
602 fn jobs_to_merge(
603 &self,
604 ) -> Option<HashMap<JobId, (HashSet<TableId>, InflightStreamingJobInfo)>> {
605 let mut table_ids_to_merge = HashMap::new();
606
607 for (table_id, creating_streaming_job) in &self.creating_streaming_job_controls {
608 if let Some(graph_info) = creating_streaming_job.should_merge_to_upstream() {
609 table_ids_to_merge.insert(
610 *table_id,
611 (
612 creating_streaming_job
613 .snapshot_backfill_upstream_tables
614 .clone(),
615 graph_info.clone(),
616 ),
617 );
618 }
619 }
620 if table_ids_to_merge.is_empty() {
621 None
622 } else {
623 Some(table_ids_to_merge)
624 }
625 }
626
627 fn enqueue_command(
629 &mut self,
630 command_ctx: CommandContext,
631 notifiers: Vec<Notifier>,
632 node_to_collect: NodeToCollect,
633 creating_jobs_to_wait: HashSet<JobId>,
634 ) {
635 let timer = self.metrics.barrier_latency.start_timer();
636
637 if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
638 assert_eq!(
639 command_ctx.barrier_info.prev_epoch.value(),
640 node.command_ctx.barrier_info.curr_epoch.value()
641 );
642 }
643
644 tracing::trace!(
645 prev_epoch = command_ctx.barrier_info.prev_epoch(),
646 ?creating_jobs_to_wait,
647 ?node_to_collect,
648 "enqueue command"
649 );
650 self.command_ctx_queue.insert(
651 command_ctx.barrier_info.prev_epoch(),
652 EpochNode {
653 enqueue_time: timer,
654 state: BarrierEpochState {
655 node_to_collect,
656 resps: vec![],
657 creating_jobs_to_wait,
658 finished_jobs: HashMap::new(),
659 },
660 command_ctx,
661 notifiers,
662 },
663 );
664 }
665
666 fn barrier_collected(
669 &mut self,
670 resp: BarrierCompleteResponse,
671 periodic_barriers: &mut PeriodicBarriers,
672 ) -> MetaResult<()> {
673 let worker_id = resp.worker_id;
674 let prev_epoch = resp.epoch;
675 tracing::trace!(
676 worker_id,
677 prev_epoch,
678 partial_graph_id = resp.partial_graph_id,
679 "barrier collected"
680 );
681 let creating_job_id = from_partial_graph_id(resp.partial_graph_id);
682 match creating_job_id {
683 None => {
684 if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
685 assert!(
686 node.state
687 .node_to_collect
688 .remove(&(worker_id as _))
689 .is_some()
690 );
691 node.state.resps.push(resp);
692 } else {
693 panic!(
694 "collect barrier on non-existing barrier: {}, {}",
695 prev_epoch, worker_id
696 );
697 }
698 }
699 Some(creating_job_id) => {
700 let should_merge_to_upstream = self
701 .creating_streaming_job_controls
702 .get_mut(&creating_job_id)
703 .expect("should exist")
704 .collect(resp);
705 if should_merge_to_upstream {
706 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
707 }
708 }
709 }
710 Ok(())
711 }
712
713 fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
715 self.command_ctx_queue
716 .values()
717 .filter(|x| x.state.is_inflight())
718 .count()
719 < in_flight_barrier_nums
720 }
721
722 pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
724 for epoch_node in self.command_ctx_queue.values_mut() {
725 if !is_valid_after_worker_err(&mut epoch_node.state.node_to_collect, worker_id) {
726 return false;
727 }
728 }
729 true
731 }
732}
733
734impl DatabaseCheckpointControl {
735 fn collect_backfill_pinned_upstream_log_epoch(
737 &self,
738 ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
739 self.creating_streaming_job_controls
740 .iter()
741 .filter_map(|(table_id, creating_job)| {
742 creating_job
743 .pinned_upstream_log_epoch()
744 .map(|progress_epoch| {
745 (
746 *table_id,
747 (
748 progress_epoch,
749 creating_job.snapshot_backfill_upstream_tables.clone(),
750 ),
751 )
752 })
753 })
754 .collect()
755 }
756
757 fn next_complete_barrier_task(
758 &mut self,
759 task: &mut Option<CompleteBarrierTask>,
760 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
761 hummock_version_stats: &HummockVersionStats,
762 ) {
763 let mut creating_jobs_task = vec![];
765 {
766 let mut finished_jobs = Vec::new();
768 let min_upstream_inflight_barrier = self
769 .command_ctx_queue
770 .first_key_value()
771 .map(|(epoch, _)| *epoch);
772 for (table_id, job) in &mut self.creating_streaming_job_controls {
773 if let Some((epoch, resps, status)) =
774 job.start_completing(min_upstream_inflight_barrier)
775 {
776 let is_first_time = match status {
777 CompleteJobType::First => true,
778 CompleteJobType::Normal => false,
779 CompleteJobType::Finished => {
780 finished_jobs.push((*table_id, epoch, resps));
781 continue;
782 }
783 };
784 creating_jobs_task.push((*table_id, epoch, resps, is_first_time));
785 }
786 }
787 if !finished_jobs.is_empty()
788 && let Some((_, control_stream_manager)) = &mut context
789 {
790 control_stream_manager.remove_partial_graph(
791 self.database_id,
792 finished_jobs
793 .iter()
794 .map(|(table_id, _, _)| *table_id)
795 .collect(),
796 );
797 }
798 for (job_id, epoch, resps) in finished_jobs {
799 let epoch_state = &mut self
800 .command_ctx_queue
801 .get_mut(&epoch)
802 .expect("should exist")
803 .state;
804 assert!(epoch_state.creating_jobs_to_wait.remove(&job_id));
805 debug!(epoch, %job_id, "finish creating job");
806 let creating_streaming_job = self
809 .creating_streaming_job_controls
810 .remove(&job_id)
811 .expect("should exist");
812 assert!(creating_streaming_job.is_finished());
813
814 let mut source_backfill_fragments = HashMap::new();
815 for info in creating_streaming_job.graph_info().fragment_infos() {
816 if let Some((source_id, upstream_source_fragment_id)) =
817 info.nodes.find_source_backfill()
818 {
819 source_backfill_fragments
820 .entry(source_id as SourceId)
821 .or_insert(BTreeSet::new())
822 .insert((info.fragment_id, upstream_source_fragment_id));
823 }
824 }
825 let source_change = if !source_backfill_fragments.is_empty() {
826 Some(SourceChange::CreateJobFinished {
827 finished_backfill_fragments: source_backfill_fragments,
828 })
829 } else {
830 None
831 };
832
833 assert!(
834 epoch_state
835 .finished_jobs
836 .insert(job_id, (resps, source_change))
837 .is_none()
838 );
839 }
840 }
841 assert!(self.completing_barrier.is_none());
842 while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
843 && !state.is_inflight()
844 {
845 {
846 let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
847 assert!(node.state.creating_jobs_to_wait.is_empty());
848 assert!(node.state.node_to_collect.is_empty());
849
850 self.handle_refresh_table_info(task, &node);
851
852 let staging_commit_info = self.create_mview_tracker.apply_collected_command(
853 node.command_ctx.command.as_ref(),
854 &node.command_ctx.barrier_info,
855 &node.state.resps,
856 hummock_version_stats,
857 );
858 let finished_cdc_backfill = self
859 .cdc_table_backfill_tracker
860 .apply_collected_command(&node.state.resps);
861 if !node.command_ctx.barrier_info.kind.is_checkpoint() {
862 assert!(staging_commit_info.is_none());
863 node.notifiers.into_iter().for_each(|notifier| {
864 notifier.notify_collected();
865 });
866 if let Some((periodic_barriers, _)) = &mut context
867 && self.create_mview_tracker.has_pending_finished_jobs()
868 && self
869 .command_ctx_queue
870 .values()
871 .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
872 {
873 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
874 }
875 continue;
876 }
877 let mut staging_commit_info =
878 staging_commit_info.expect("should be Some for checkpoint");
879 node.state
880 .finished_jobs
881 .drain()
882 .for_each(|(job_id, (resps, source_change))| {
883 node.state.resps.extend(resps);
884 staging_commit_info
885 .finished_jobs
886 .push(TrackingJob::new(job_id, source_change));
887 });
888 let task = task.get_or_insert_default();
889 node.command_ctx.collect_commit_epoch_info(
890 &mut task.commit_info,
891 take(&mut node.state.resps),
892 self.collect_backfill_pinned_upstream_log_epoch(),
893 );
894 self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
895 task.finished_jobs.extend(staging_commit_info.finished_jobs);
896 task.finished_cdc_table_backfill
897 .extend(finished_cdc_backfill);
898 task.notifiers.extend(node.notifiers);
899 task.epoch_infos
900 .try_insert(
901 self.database_id,
902 (Some((node.command_ctx, node.enqueue_time)), vec![]),
903 )
904 .expect("non duplicate");
905 task.commit_info
906 .truncate_tables
907 .extend(staging_commit_info.table_ids_to_truncate);
908 break;
909 }
910 }
911 if !creating_jobs_task.is_empty() {
912 let task = task.get_or_insert_default();
913 for (job_id, epoch, resps, is_first_time) in creating_jobs_task {
914 collect_creating_job_commit_epoch_info(
915 &mut task.commit_info,
916 epoch,
917 resps,
918 self.creating_streaming_job_controls[&job_id].state_table_ids(),
919 is_first_time,
920 );
921 let (_, creating_job_epochs) =
922 task.epoch_infos.entry(self.database_id).or_default();
923 creating_job_epochs.push((job_id, epoch));
924 }
925 }
926 }
927
928 fn ack_completed(
929 &mut self,
930 command_prev_epoch: Option<u64>,
931 creating_job_epochs: Vec<(JobId, u64)>,
932 ) {
933 {
934 if let Some(prev_epoch) = self.completing_barrier.take() {
935 assert_eq!(command_prev_epoch, Some(prev_epoch));
936 self.committed_epoch = Some(prev_epoch);
937 } else {
938 assert_eq!(command_prev_epoch, None);
939 };
940 for (job_id, epoch) in creating_job_epochs {
941 self.creating_streaming_job_controls
942 .get_mut(&job_id)
943 .expect("should exist")
944 .ack_completed(epoch)
945 }
946 }
947 }
948
949 fn handle_refresh_table_info(&self, task: &mut Option<CompleteBarrierTask>, node: &EpochNode) {
950 let list_finished_info = node
951 .state
952 .resps
953 .iter()
954 .flat_map(|resp| resp.list_finished_sources.clone())
955 .collect::<Vec<_>>();
956 if !list_finished_info.is_empty() {
957 let task = task.get_or_insert_default();
958 task.list_finished_source_ids.extend(list_finished_info);
959 }
960
961 let load_finished_info = node
962 .state
963 .resps
964 .iter()
965 .flat_map(|resp| resp.load_finished_sources.clone())
966 .collect::<Vec<_>>();
967 if !load_finished_info.is_empty() {
968 let task = task.get_or_insert_default();
969 task.load_finished_source_ids.extend(load_finished_info);
970 }
971
972 let refresh_finished_table_ids: Vec<JobId> = node
973 .state
974 .resps
975 .iter()
976 .flat_map(|resp| {
977 resp.refresh_finished_tables
978 .iter()
979 .map(|table_id| table_id.as_job_id())
980 })
981 .collect::<Vec<_>>();
982 if !refresh_finished_table_ids.is_empty() {
983 let task = task.get_or_insert_default();
984 task.refresh_finished_table_job_ids
985 .extend(refresh_finished_table_ids);
986 }
987 }
988}
989
990struct EpochNode {
992 enqueue_time: HistogramTimer,
994
995 state: BarrierEpochState,
997
998 command_ctx: CommandContext,
1000 notifiers: Vec<Notifier>,
1002}
1003
1004#[derive(Debug)]
1005struct BarrierEpochState {
1007 node_to_collect: NodeToCollect,
1008
1009 resps: Vec<BarrierCompleteResponse>,
1010
1011 creating_jobs_to_wait: HashSet<JobId>,
1012
1013 finished_jobs: HashMap<JobId, (Vec<BarrierCompleteResponse>, Option<SourceChange>)>,
1014}
1015
1016impl BarrierEpochState {
1017 fn is_inflight(&self) -> bool {
1018 !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
1019 }
1020}
1021
1022impl DatabaseCheckpointControl {
1023 fn handle_new_barrier(
1025 &mut self,
1026 command: Option<(Command, Vec<Notifier>)>,
1027 checkpoint: bool,
1028 span: tracing::Span,
1029 control_stream_manager: &mut ControlStreamManager,
1030 hummock_version_stats: &HummockVersionStats,
1031 ) -> MetaResult<()> {
1032 let curr_epoch = self.state.in_flight_prev_epoch().next();
1033
1034 let (mut command, mut notifiers) = if let Some((command, notifiers)) = command {
1035 (Some(command), notifiers)
1036 } else {
1037 (None, vec![])
1038 };
1039
1040 for job_to_cancel in command
1041 .as_ref()
1042 .map(Command::jobs_to_drop)
1043 .into_iter()
1044 .flatten()
1045 {
1046 if self
1047 .creating_streaming_job_controls
1048 .contains_key(&job_to_cancel)
1049 {
1050 warn!(
1051 job_id = %job_to_cancel,
1052 "ignore cancel command on creating streaming job"
1053 );
1054 for notifier in notifiers {
1055 notifier
1056 .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
1057 }
1058 return Ok(());
1059 }
1060 }
1061
1062 if let Some(Command::RescheduleFragment { .. }) = &command
1063 && !self.creating_streaming_job_controls.is_empty()
1064 {
1065 warn!("ignore reschedule when creating streaming job with snapshot backfill");
1066 for notifier in notifiers {
1067 notifier.notify_start_failed(
1068 anyhow!(
1069 "cannot reschedule when creating streaming job with snapshot backfill",
1070 )
1071 .into(),
1072 );
1073 }
1074 return Ok(());
1075 }
1076
1077 let Some(barrier_info) =
1078 self.state
1079 .next_barrier_info(command.as_ref(), checkpoint, curr_epoch)
1080 else {
1081 for mut notifier in notifiers {
1083 notifier.notify_started();
1084 notifier.notify_collected();
1085 }
1086 return Ok(());
1087 };
1088
1089 let mut edges = self
1090 .state
1091 .inflight_graph_info
1092 .build_edge(command.as_ref(), &*control_stream_manager);
1093
1094 if let Some(Command::CreateStreamingJob {
1096 job_type,
1097 info,
1098 cross_db_snapshot_backfill_info,
1099 }) = &mut command
1100 {
1101 match job_type {
1102 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
1103 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1104 fill_snapshot_backfill_epoch(
1105 &mut fragment.nodes,
1106 None,
1107 cross_db_snapshot_backfill_info,
1108 )?;
1109 }
1110 }
1111 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
1112 if self.state.is_paused() {
1113 warn!("cannot create streaming job with snapshot backfill when paused");
1114 for notifier in notifiers {
1115 notifier.notify_start_failed(
1116 anyhow!("cannot create streaming job with snapshot backfill when paused",)
1117 .into(),
1118 );
1119 }
1120 return Ok(());
1121 }
1122 for snapshot_backfill_epoch in snapshot_backfill_info
1124 .upstream_mv_table_id_to_backfill_epoch
1125 .values_mut()
1126 {
1127 assert_eq!(
1128 snapshot_backfill_epoch.replace(barrier_info.prev_epoch()),
1129 None,
1130 "must not set previously"
1131 );
1132 }
1133 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1134 if let Err(e) = fill_snapshot_backfill_epoch(
1135 &mut fragment.nodes,
1136 Some(snapshot_backfill_info),
1137 cross_db_snapshot_backfill_info,
1138 ) {
1139 warn!(e = %e.as_report(), "failed to fill snapshot backfill epoch");
1140 for notifier in notifiers {
1141 notifier.notify_start_failed(e.clone());
1142 }
1143 return Ok(());
1144 };
1145 }
1146 let job_id = info.stream_job_fragments.stream_job_id();
1147 let snapshot_backfill_upstream_tables = snapshot_backfill_info
1148 .upstream_mv_table_id_to_backfill_epoch
1149 .keys()
1150 .cloned()
1151 .collect();
1152
1153 let job = CreatingStreamingJobControl::new(
1154 info,
1155 snapshot_backfill_upstream_tables,
1156 barrier_info.prev_epoch(),
1157 hummock_version_stats,
1158 control_stream_manager,
1159 edges.as_mut().expect("should exist"),
1160 )?;
1161
1162 self.state.inflight_graph_info.shared_actor_infos.upsert(
1163 self.database_id,
1164 job.graph_info()
1165 .fragment_infos
1166 .values()
1167 .map(|fragment| (fragment, job.job_id)),
1168 );
1169
1170 self.creating_streaming_job_controls.insert(job_id, job);
1171 }
1172 }
1173 }
1174
1175 if let (BarrierKind::Checkpoint(_), None) = (&barrier_info.kind, &command) {
1177 if let Some(jobs_to_merge) = self.jobs_to_merge() {
1178 command = Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge));
1179 } else {
1180 let pending_backfill_nodes =
1181 self.create_mview_tracker.take_pending_backfill_nodes();
1182 if !pending_backfill_nodes.is_empty() {
1183 command = Some(Command::StartFragmentBackfill {
1184 fragment_ids: pending_backfill_nodes,
1185 });
1186 }
1187 }
1188 }
1189
1190 let command = command;
1191
1192 let (
1193 pre_applied_graph_info,
1194 mv_subscription_max_retention,
1195 table_ids_to_commit,
1196 jobs_to_wait,
1197 prev_paused_reason,
1198 ) = self.state.apply_command(command.as_ref());
1199
1200 barrier_info.prev_epoch.span().in_scope(|| {
1202 tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch.value().0, "new barrier enqueued");
1203 });
1204 span.record("epoch", barrier_info.curr_epoch.value().0);
1205
1206 for creating_job in &mut self.creating_streaming_job_controls.values_mut() {
1207 creating_job.on_new_command(control_stream_manager, command.as_ref(), &barrier_info)?;
1208 }
1209
1210 let node_to_collect = match control_stream_manager.inject_command_ctx_barrier(
1211 self.database_id,
1212 command.as_ref(),
1213 &barrier_info,
1214 prev_paused_reason,
1215 &pre_applied_graph_info,
1216 &self.state.inflight_graph_info,
1217 &mut edges,
1218 ) {
1219 Ok(node_to_collect) => node_to_collect,
1220 Err(err) => {
1221 for notifier in notifiers {
1222 notifier.notify_start_failed(err.clone());
1223 }
1224 fail_point!("inject_barrier_err_success");
1225 return Err(err);
1226 }
1227 };
1228
1229 notifiers.iter_mut().for_each(|n| n.notify_started());
1231
1232 let command_ctx = CommandContext::new(
1233 barrier_info,
1234 mv_subscription_max_retention,
1235 table_ids_to_commit,
1236 command,
1237 span,
1238 );
1239
1240 self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1242
1243 Ok(())
1244 }
1245}