1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
26use risingwave_meta_model::{SourceId, WorkerId};
27use risingwave_pb::ddl_service::DdlProgress;
28use risingwave_pb::hummock::HummockVersionStats;
29use risingwave_pb::stream_service::BarrierCompleteResponse;
30use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
31use thiserror_ext::AsReport;
32use tracing::{debug, warn};
33
34use crate::barrier::cdc_progress::CdcTableBackfillTrackerRef;
35use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
36use crate::barrier::checkpoint::recovery::{
37 DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
38 RecoveringStateAction,
39};
40use crate::barrier::checkpoint::state::BarrierWorkerState;
41use crate::barrier::command::CommandContext;
42use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
43use crate::barrier::info::{InflightStreamingJobInfo, SharedActorInfos};
44use crate::barrier::notifier::Notifier;
45use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
46use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
47use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
48use crate::barrier::utils::{
49 NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
50};
51use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, InflightSubscriptionInfo};
52use crate::manager::MetaSrvEnv;
53use crate::rpc::metrics::GLOBAL_META_METRICS;
54use crate::stream::{SourceChange, fill_snapshot_backfill_epoch};
55use crate::{MetaError, MetaResult};
56
57pub(crate) struct CheckpointControl {
58 pub(crate) env: MetaSrvEnv,
59 pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
60 pub(super) hummock_version_stats: HummockVersionStats,
61 pub(crate) in_flight_barrier_nums: usize,
63}
64
65impl CheckpointControl {
66 pub fn new(env: MetaSrvEnv) -> Self {
67 Self {
68 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
69 env,
70 databases: Default::default(),
71 hummock_version_stats: Default::default(),
72 }
73 }
74
75 pub(crate) fn recover(
76 databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
77 failed_databases: HashSet<DatabaseId>,
78 control_stream_manager: &mut ControlStreamManager,
79 hummock_version_stats: HummockVersionStats,
80 env: MetaSrvEnv,
81 ) -> Self {
82 env.shared_actor_infos()
83 .retain_databases(databases.keys().chain(&failed_databases).cloned());
84 Self {
85 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
86 env,
87 databases: databases
88 .into_iter()
89 .map(|(database_id, control)| {
90 (
91 database_id,
92 DatabaseCheckpointControlStatus::Running(control),
93 )
94 })
95 .chain(failed_databases.into_iter().map(|database_id| {
96 (
97 database_id,
98 DatabaseCheckpointControlStatus::Recovering(
99 DatabaseRecoveringState::resetting(database_id, control_stream_manager),
100 ),
101 )
102 }))
103 .collect(),
104 hummock_version_stats,
105 }
106 }
107
108 pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
109 self.hummock_version_stats = output.hummock_version_stats;
110 for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
111 self.databases
112 .get_mut(&database_id)
113 .expect("should exist")
114 .expect_running("should have wait for completing command before enter recovery")
115 .ack_completed(command_prev_epoch, creating_job_epochs);
116 }
117 }
118
119 pub(crate) fn next_complete_barrier_task(
120 &mut self,
121 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
122 ) -> Option<CompleteBarrierTask> {
123 let mut task = None;
124 for database in self.databases.values_mut() {
125 let Some(database) = database.running_state_mut() else {
126 continue;
127 };
128 let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
129 database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
130 }
131 task
132 }
133
134 pub(crate) fn barrier_collected(
135 &mut self,
136 resp: BarrierCompleteResponse,
137 periodic_barriers: &mut PeriodicBarriers,
138 ) -> MetaResult<()> {
139 let database_id = DatabaseId::new(resp.database_id);
140 let database_status = self.databases.get_mut(&database_id).expect("should exist");
141 match database_status {
142 DatabaseCheckpointControlStatus::Running(database) => {
143 database.barrier_collected(resp, periodic_barriers)
144 }
145 DatabaseCheckpointControlStatus::Recovering(state) => {
146 state.barrier_collected(database_id, resp);
147 Ok(())
148 }
149 }
150 }
151
152 pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
153 self.databases.iter().filter_map(|(database_id, database)| {
154 database.running_state().is_none().then_some(*database_id)
155 })
156 }
157
158 pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
159 self.databases.iter().filter_map(|(database_id, database)| {
160 database.running_state().is_some().then_some(*database_id)
161 })
162 }
163
164 pub(crate) fn handle_new_barrier(
166 &mut self,
167 new_barrier: NewBarrier,
168 control_stream_manager: &mut ControlStreamManager,
169 ) -> MetaResult<()> {
170 let NewBarrier {
171 database_id,
172 command,
173 span,
174 checkpoint,
175 } = new_barrier;
176
177 if let Some((mut command, notifiers)) = command {
178 if let &mut Command::CreateStreamingJob {
179 ref mut cross_db_snapshot_backfill_info,
180 ref info,
181 ..
182 } = &mut command
183 {
184 for (table_id, snapshot_epoch) in
185 &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
186 {
187 for database in self.databases.values() {
188 if let Some(database) = database.running_state()
189 && database.state.inflight_graph_info.contains_job(*table_id)
190 {
191 if let Some(committed_epoch) = database.committed_epoch {
192 *snapshot_epoch = Some(committed_epoch);
193 }
194 break;
195 }
196 }
197 if snapshot_epoch.is_none() {
198 let table_id = *table_id;
199 warn!(
200 ?cross_db_snapshot_backfill_info,
201 ?table_id,
202 ?info,
203 "database of cross db upstream table not found"
204 );
205 let err: MetaError =
206 anyhow!("database of cross db upstream table {} not found", table_id)
207 .into();
208 for notifier in notifiers {
209 notifier.notify_start_failed(err.clone());
210 }
211
212 return Ok(());
213 }
214 }
215 }
216
217 let database = match self.databases.entry(database_id) {
218 Entry::Occupied(entry) => entry
219 .into_mut()
220 .expect_running("should not have command when not running"),
221 Entry::Vacant(entry) => match &command {
222 Command::CreateStreamingJob {
223 job_type: CreateStreamingJobType::Normal,
224 ..
225 } => {
226 let new_database = DatabaseCheckpointControl::new(
227 database_id,
228 self.env.shared_actor_infos().clone(),
229 self.env.cdc_table_backfill_tracker(),
230 );
231 control_stream_manager.add_partial_graph(database_id, None);
232 entry
233 .insert(DatabaseCheckpointControlStatus::Running(new_database))
234 .expect_running("just initialized as running")
235 }
236 Command::Flush | Command::Pause | Command::Resume => {
237 for mut notifier in notifiers {
238 notifier.notify_started();
239 notifier.notify_collected();
240 }
241 warn!(?command, "skip command for empty database");
242 return Ok(());
243 }
244 _ => {
245 panic!(
246 "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
247 database_id, command
248 )
249 }
250 },
251 };
252
253 database.handle_new_barrier(
254 Some((command, notifiers)),
255 checkpoint,
256 span,
257 control_stream_manager,
258 &self.hummock_version_stats,
259 )
260 } else {
261 let database = match self.databases.entry(database_id) {
262 Entry::Occupied(entry) => entry.into_mut(),
263 Entry::Vacant(_) => {
264 return Ok(());
267 }
268 };
269 let Some(database) = database.running_state_mut() else {
270 return Ok(());
272 };
273 if !database.can_inject_barrier(self.in_flight_barrier_nums) {
274 return Ok(());
276 }
277 database.handle_new_barrier(
278 None,
279 checkpoint,
280 span,
281 control_stream_manager,
282 &self.hummock_version_stats,
283 )
284 }
285 }
286
287 pub(crate) fn update_barrier_nums_metrics(&self) {
288 self.databases
289 .values()
290 .flat_map(|database| database.running_state())
291 .for_each(|database| database.update_barrier_nums_metrics());
292 }
293
294 pub(crate) fn gen_ddl_progress(&self) -> HashMap<u32, DdlProgress> {
295 let mut progress = HashMap::new();
296 for status in self.databases.values() {
297 let Some(database_checkpoint_control) = status.running_state() else {
298 continue;
299 };
300 progress.extend(
302 database_checkpoint_control
303 .create_mview_tracker
304 .gen_ddl_progress(),
305 );
306 for creating_job in database_checkpoint_control
308 .creating_streaming_job_controls
309 .values()
310 {
311 progress.extend([(
312 creating_job.job_id.table_id,
313 creating_job.gen_ddl_progress(),
314 )]);
315 }
316 }
317 progress
318 }
319
320 pub(crate) fn databases_failed_at_worker_err(
321 &mut self,
322 worker_id: WorkerId,
323 ) -> Vec<DatabaseId> {
324 let mut failed_databases = Vec::new();
325 for (database_id, database_status) in &mut self.databases {
326 let database_checkpoint_control = match database_status {
327 DatabaseCheckpointControlStatus::Running(control) => control,
328 DatabaseCheckpointControlStatus::Recovering(state) => {
329 if !state.is_valid_after_worker_err(worker_id) {
330 failed_databases.push(*database_id);
331 }
332 continue;
333 }
334 };
335
336 if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
337 || database_checkpoint_control
338 .state
339 .inflight_graph_info
340 .contains_worker(worker_id as _)
341 || database_checkpoint_control
342 .creating_streaming_job_controls
343 .values_mut()
344 .any(|job| !job.is_valid_after_worker_err(worker_id))
345 {
346 failed_databases.push(*database_id);
347 }
348 }
349 failed_databases
350 }
351
352 pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
353 for (_, node) in self.databases.values_mut().flat_map(|status| {
354 status
355 .running_state_mut()
356 .map(|database| take(&mut database.command_ctx_queue))
357 .into_iter()
358 .flatten()
359 }) {
360 for notifier in node.notifiers {
361 notifier.notify_collection_failed(err.clone());
362 }
363 node.enqueue_time.observe_duration();
364 }
365 self.databases.values_mut().for_each(|database| {
366 if let Some(database) = database.running_state_mut() {
367 database.create_mview_tracker.abort_all()
368 }
369 });
370 }
371
372 pub(crate) fn inflight_infos(
373 &self,
374 ) -> impl Iterator<
375 Item = (
376 DatabaseId,
377 &InflightSubscriptionInfo,
378 impl Iterator<Item = TableId> + '_,
379 ),
380 > + '_ {
381 self.databases.iter().flat_map(|(database_id, database)| {
382 database
383 .database_state()
384 .map(|(database_state, creating_jobs)| {
385 (
386 *database_id,
387 &database_state.inflight_subscription_info,
388 creating_jobs
389 .values()
390 .filter_map(|job| job.is_consuming().then_some(job.job_id)),
391 )
392 })
393 })
394 }
395}
396
397pub(crate) enum CheckpointControlEvent<'a> {
398 EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
399 EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
400}
401
402impl CheckpointControl {
403 pub(crate) fn on_reset_database_resp(
404 &mut self,
405 worker_id: WorkerId,
406 resp: ResetDatabaseResponse,
407 ) {
408 let database_id = DatabaseId::new(resp.database_id);
409 match self.databases.get_mut(&database_id).expect("should exist") {
410 DatabaseCheckpointControlStatus::Running(_) => {
411 unreachable!("should not receive reset database resp when running")
412 }
413 DatabaseCheckpointControlStatus::Recovering(state) => {
414 state.on_reset_database_resp(worker_id, resp)
415 }
416 }
417 }
418
419 pub(crate) fn next_event(
420 &mut self,
421 ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
422 let mut this = Some(self);
423 poll_fn(move |cx| {
424 let Some(this_mut) = this.as_mut() else {
425 unreachable!("should not be polled after poll ready")
426 };
427 for (&database_id, database_status) in &mut this_mut.databases {
428 match database_status {
429 DatabaseCheckpointControlStatus::Running(_) => {}
430 DatabaseCheckpointControlStatus::Recovering(state) => {
431 let poll_result = state.poll_next_event(cx);
432 if let Poll::Ready(action) = poll_result {
433 let this = this.take().expect("checked Some");
434 return Poll::Ready(match action {
435 RecoveringStateAction::EnterInitializing(reset_workers) => {
436 CheckpointControlEvent::EnteringInitializing(
437 this.new_database_status_action(
438 database_id,
439 EnterInitializing(reset_workers),
440 ),
441 )
442 }
443 RecoveringStateAction::EnterRunning => {
444 CheckpointControlEvent::EnteringRunning(
445 this.new_database_status_action(database_id, EnterRunning),
446 )
447 }
448 });
449 }
450 }
451 }
452 }
453 Poll::Pending
454 })
455 }
456}
457
458pub(crate) enum DatabaseCheckpointControlStatus {
459 Running(DatabaseCheckpointControl),
460 Recovering(DatabaseRecoveringState),
461}
462
463impl DatabaseCheckpointControlStatus {
464 fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
465 match self {
466 DatabaseCheckpointControlStatus::Running(state) => Some(state),
467 DatabaseCheckpointControlStatus::Recovering(_) => None,
468 }
469 }
470
471 fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
472 match self {
473 DatabaseCheckpointControlStatus::Running(state) => Some(state),
474 DatabaseCheckpointControlStatus::Recovering(_) => None,
475 }
476 }
477
478 fn database_state(
479 &self,
480 ) -> Option<(
481 &BarrierWorkerState,
482 &HashMap<TableId, CreatingStreamingJobControl>,
483 )> {
484 match self {
485 DatabaseCheckpointControlStatus::Running(control) => {
486 Some((&control.state, &control.creating_streaming_job_controls))
487 }
488 DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
489 }
490 }
491
492 fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
493 match self {
494 DatabaseCheckpointControlStatus::Running(state) => state,
495 DatabaseCheckpointControlStatus::Recovering(_) => {
496 panic!("should be at running: {}", reason)
497 }
498 }
499 }
500}
501
502struct DatabaseCheckpointControlMetrics {
503 barrier_latency: LabelGuardedHistogram,
504 in_flight_barrier_nums: LabelGuardedIntGauge,
505 all_barrier_nums: LabelGuardedIntGauge,
506}
507
508impl DatabaseCheckpointControlMetrics {
509 fn new(database_id: DatabaseId) -> Self {
510 let database_id_str = database_id.database_id.to_string();
511 let barrier_latency = GLOBAL_META_METRICS
512 .barrier_latency
513 .with_guarded_label_values(&[&database_id_str]);
514 let in_flight_barrier_nums = GLOBAL_META_METRICS
515 .in_flight_barrier_nums
516 .with_guarded_label_values(&[&database_id_str]);
517 let all_barrier_nums = GLOBAL_META_METRICS
518 .all_barrier_nums
519 .with_guarded_label_values(&[&database_id_str]);
520 Self {
521 barrier_latency,
522 in_flight_barrier_nums,
523 all_barrier_nums,
524 }
525 }
526}
527
528pub(crate) struct DatabaseCheckpointControl {
530 database_id: DatabaseId,
531 state: BarrierWorkerState,
532
533 command_ctx_queue: BTreeMap<u64, EpochNode>,
536 completing_barrier: Option<u64>,
539
540 committed_epoch: Option<u64>,
541 creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
542
543 create_mview_tracker: CreateMviewProgressTracker,
544 cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
545
546 metrics: DatabaseCheckpointControlMetrics,
547}
548
549impl DatabaseCheckpointControl {
550 fn new(
551 database_id: DatabaseId,
552 shared_actor_infos: SharedActorInfos,
553 cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
554 ) -> Self {
555 Self {
556 database_id,
557 state: BarrierWorkerState::new(database_id, shared_actor_infos),
558 command_ctx_queue: Default::default(),
559 completing_barrier: None,
560 committed_epoch: None,
561 creating_streaming_job_controls: Default::default(),
562 create_mview_tracker: Default::default(),
563 cdc_table_backfill_tracker,
564 metrics: DatabaseCheckpointControlMetrics::new(database_id),
565 }
566 }
567
568 pub(crate) fn recovery(
569 database_id: DatabaseId,
570 create_mview_tracker: CreateMviewProgressTracker,
571 state: BarrierWorkerState,
572 committed_epoch: u64,
573 creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
574 cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
575 ) -> Self {
576 Self {
577 database_id,
578 state,
579 command_ctx_queue: Default::default(),
580 completing_barrier: None,
581 committed_epoch: Some(committed_epoch),
582 creating_streaming_job_controls,
583 create_mview_tracker,
584 cdc_table_backfill_tracker,
585 metrics: DatabaseCheckpointControlMetrics::new(database_id),
586 }
587 }
588
589 fn total_command_num(&self) -> usize {
590 self.command_ctx_queue.len()
591 + match &self.completing_barrier {
592 Some(_) => 1,
593 None => 0,
594 }
595 }
596
597 fn update_barrier_nums_metrics(&self) {
599 self.metrics.in_flight_barrier_nums.set(
600 self.command_ctx_queue
601 .values()
602 .filter(|x| x.state.is_inflight())
603 .count() as i64,
604 );
605 self.metrics
606 .all_barrier_nums
607 .set(self.total_command_num() as i64);
608 }
609
610 fn jobs_to_merge(
611 &self,
612 ) -> Option<HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>> {
613 let mut table_ids_to_merge = HashMap::new();
614
615 for (table_id, creating_streaming_job) in &self.creating_streaming_job_controls {
616 if let Some(graph_info) = creating_streaming_job.should_merge_to_upstream() {
617 table_ids_to_merge.insert(
618 *table_id,
619 (
620 creating_streaming_job
621 .snapshot_backfill_upstream_tables
622 .clone(),
623 graph_info.clone(),
624 ),
625 );
626 }
627 }
628 if table_ids_to_merge.is_empty() {
629 None
630 } else {
631 Some(table_ids_to_merge)
632 }
633 }
634
635 fn enqueue_command(
637 &mut self,
638 command_ctx: CommandContext,
639 notifiers: Vec<Notifier>,
640 node_to_collect: NodeToCollect,
641 creating_jobs_to_wait: HashSet<TableId>,
642 ) {
643 let timer = self.metrics.barrier_latency.start_timer();
644
645 if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
646 assert_eq!(
647 command_ctx.barrier_info.prev_epoch.value(),
648 node.command_ctx.barrier_info.curr_epoch.value()
649 );
650 }
651
652 tracing::trace!(
653 prev_epoch = command_ctx.barrier_info.prev_epoch(),
654 ?creating_jobs_to_wait,
655 ?node_to_collect,
656 "enqueue command"
657 );
658 self.command_ctx_queue.insert(
659 command_ctx.barrier_info.prev_epoch(),
660 EpochNode {
661 enqueue_time: timer,
662 state: BarrierEpochState {
663 node_to_collect,
664 resps: vec![],
665 creating_jobs_to_wait,
666 finished_jobs: HashMap::new(),
667 },
668 command_ctx,
669 notifiers,
670 },
671 );
672 }
673
674 fn barrier_collected(
677 &mut self,
678 resp: BarrierCompleteResponse,
679 periodic_barriers: &mut PeriodicBarriers,
680 ) -> MetaResult<()> {
681 let worker_id = resp.worker_id;
682 let prev_epoch = resp.epoch;
683 tracing::trace!(
684 worker_id,
685 prev_epoch,
686 partial_graph_id = resp.partial_graph_id,
687 "barrier collected"
688 );
689 let creating_job_id = from_partial_graph_id(resp.partial_graph_id);
690 match creating_job_id {
691 None => {
692 if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
693 assert!(
694 node.state
695 .node_to_collect
696 .remove(&(worker_id as _))
697 .is_some()
698 );
699 node.state.resps.push(resp);
700 } else {
701 panic!(
702 "collect barrier on non-existing barrier: {}, {}",
703 prev_epoch, worker_id
704 );
705 }
706 }
707 Some(creating_job_id) => {
708 let should_merge_to_upstream = self
709 .creating_streaming_job_controls
710 .get_mut(&creating_job_id)
711 .expect("should exist")
712 .collect(resp);
713 if should_merge_to_upstream {
714 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
715 }
716 }
717 }
718 Ok(())
719 }
720
721 fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
723 self.command_ctx_queue
724 .values()
725 .filter(|x| x.state.is_inflight())
726 .count()
727 < in_flight_barrier_nums
728 }
729
730 pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
732 for epoch_node in self.command_ctx_queue.values_mut() {
733 if !is_valid_after_worker_err(&mut epoch_node.state.node_to_collect, worker_id) {
734 return false;
735 }
736 }
737 true
739 }
740}
741
742impl DatabaseCheckpointControl {
743 fn collect_backfill_pinned_upstream_log_epoch(
745 &self,
746 ) -> HashMap<TableId, (u64, HashSet<TableId>)> {
747 self.creating_streaming_job_controls
748 .iter()
749 .filter_map(|(table_id, creating_job)| {
750 creating_job
751 .pinned_upstream_log_epoch()
752 .map(|progress_epoch| {
753 (
754 *table_id,
755 (
756 progress_epoch,
757 creating_job.snapshot_backfill_upstream_tables.clone(),
758 ),
759 )
760 })
761 })
762 .collect()
763 }
764
765 fn next_complete_barrier_task(
766 &mut self,
767 task: &mut Option<CompleteBarrierTask>,
768 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
769 hummock_version_stats: &HummockVersionStats,
770 ) {
771 let mut creating_jobs_task = vec![];
773 {
774 let mut finished_jobs = Vec::new();
776 let min_upstream_inflight_barrier = self
777 .command_ctx_queue
778 .first_key_value()
779 .map(|(epoch, _)| *epoch);
780 for (table_id, job) in &mut self.creating_streaming_job_controls {
781 if let Some((epoch, resps, status)) =
782 job.start_completing(min_upstream_inflight_barrier)
783 {
784 let is_first_time = match status {
785 CompleteJobType::First => true,
786 CompleteJobType::Normal => false,
787 CompleteJobType::Finished => {
788 finished_jobs.push((*table_id, epoch, resps));
789 continue;
790 }
791 };
792 creating_jobs_task.push((*table_id, epoch, resps, is_first_time));
793 }
794 }
795 if !finished_jobs.is_empty()
796 && let Some((_, control_stream_manager)) = &mut context
797 {
798 control_stream_manager.remove_partial_graph(
799 self.database_id,
800 finished_jobs
801 .iter()
802 .map(|(table_id, _, _)| *table_id)
803 .collect(),
804 );
805 }
806 for (table_id, epoch, resps) in finished_jobs {
807 let epoch_state = &mut self
808 .command_ctx_queue
809 .get_mut(&epoch)
810 .expect("should exist")
811 .state;
812 assert!(epoch_state.creating_jobs_to_wait.remove(&table_id));
813 debug!(epoch, ?table_id, "finish creating job");
814 let creating_streaming_job = self
817 .creating_streaming_job_controls
818 .remove(&table_id)
819 .expect("should exist");
820 assert!(creating_streaming_job.is_finished());
821
822 let mut source_backfill_fragments = HashMap::new();
823 for info in creating_streaming_job.graph_info().fragment_infos() {
824 if let Some((source_id, upstream_source_fragment_id)) =
825 info.nodes.find_source_backfill()
826 {
827 source_backfill_fragments
828 .entry(source_id as SourceId)
829 .or_insert(BTreeSet::new())
830 .insert((info.fragment_id, upstream_source_fragment_id));
831 }
832 }
833 let source_change = if !source_backfill_fragments.is_empty() {
834 Some(SourceChange::CreateJobFinished {
835 finished_backfill_fragments: source_backfill_fragments,
836 })
837 } else {
838 None
839 };
840
841 assert!(
842 epoch_state
843 .finished_jobs
844 .insert(table_id, (resps, source_change))
845 .is_none()
846 );
847 }
848 }
849 assert!(self.completing_barrier.is_none());
850 while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
851 && !state.is_inflight()
852 {
853 {
854 let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
855 assert!(node.state.creating_jobs_to_wait.is_empty());
856 assert!(node.state.node_to_collect.is_empty());
857
858 let list_finished_source_ids: Vec<_> = node
860 .state
861 .resps
862 .iter()
863 .flat_map(|resp| &resp.list_finished_source_ids)
864 .cloned()
865 .collect::<HashSet<_>>() .into_iter()
867 .collect();
868 if !list_finished_source_ids.is_empty() {
869 let task = task.get_or_insert_default();
871 task.list_finished_source_ids
872 .extend(list_finished_source_ids);
873 }
874 let load_finished_source_ids: Vec<_> = node
876 .state
877 .resps
878 .iter()
879 .flat_map(|resp| &resp.load_finished_source_ids)
880 .cloned()
881 .collect::<HashSet<_>>() .into_iter()
883 .collect();
884 if !load_finished_source_ids.is_empty() {
885 let task = task.get_or_insert_default();
887 task.load_finished_source_ids
888 .extend(load_finished_source_ids);
889 }
890 let refresh_finished_table_ids: Vec<_> = node
892 .state
893 .resps
894 .iter()
895 .flat_map(|resp| &resp.refresh_finished_tables)
896 .cloned()
897 .collect::<HashSet<_>>() .into_iter()
899 .collect();
900 if !refresh_finished_table_ids.is_empty() {
901 let task = task.get_or_insert_default();
903 task.refresh_finished_table_ids
904 .extend(refresh_finished_table_ids);
905 }
906
907 let staging_commit_info = self.create_mview_tracker.apply_collected_command(
908 node.command_ctx.command.as_ref(),
909 &node.command_ctx.barrier_info,
910 &node.state.resps,
911 hummock_version_stats,
912 );
913 let finished_cdc_backfill = self
914 .cdc_table_backfill_tracker
915 .apply_collected_command(&node.state.resps);
916 if !node.command_ctx.barrier_info.kind.is_checkpoint() {
917 assert!(staging_commit_info.is_none());
918 node.notifiers.into_iter().for_each(|notifier| {
919 notifier.notify_collected();
920 });
921 if let Some((periodic_barriers, _)) = &mut context
922 && self.create_mview_tracker.has_pending_finished_jobs()
923 && self
924 .command_ctx_queue
925 .values()
926 .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
927 {
928 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
929 }
930 continue;
931 }
932 let mut staging_commit_info =
933 staging_commit_info.expect("should be Some for checkpoint");
934 node.state
935 .finished_jobs
936 .drain()
937 .for_each(|(job_id, (resps, source_change))| {
938 node.state.resps.extend(resps);
939 staging_commit_info
940 .finished_jobs
941 .push(TrackingJob::new(job_id.table_id as _, source_change));
942 });
943 let task = task.get_or_insert_default();
944 node.command_ctx.collect_commit_epoch_info(
945 &mut task.commit_info,
946 take(&mut node.state.resps),
947 self.collect_backfill_pinned_upstream_log_epoch(),
948 );
949 self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
950 task.finished_jobs.extend(staging_commit_info.finished_jobs);
951 task.finished_cdc_table_backfill
952 .extend(finished_cdc_backfill);
953 task.notifiers.extend(node.notifiers);
954 task.epoch_infos
955 .try_insert(
956 self.database_id,
957 (Some((node.command_ctx, node.enqueue_time)), vec![]),
958 )
959 .expect("non duplicate");
960 task.commit_info
961 .truncate_tables
962 .extend(staging_commit_info.table_ids_to_truncate);
963 break;
964 }
965 }
966 if !creating_jobs_task.is_empty() {
967 let task = task.get_or_insert_default();
968 for (table_id, epoch, resps, is_first_time) in creating_jobs_task {
969 collect_creating_job_commit_epoch_info(
970 &mut task.commit_info,
971 epoch,
972 resps,
973 self.creating_streaming_job_controls[&table_id].state_table_ids(),
974 is_first_time,
975 );
976 let (_, creating_job_epochs) =
977 task.epoch_infos.entry(self.database_id).or_default();
978 creating_job_epochs.push((table_id, epoch));
979 }
980 }
981 }
982
983 fn ack_completed(
984 &mut self,
985 command_prev_epoch: Option<u64>,
986 creating_job_epochs: Vec<(TableId, u64)>,
987 ) {
988 {
989 if let Some(prev_epoch) = self.completing_barrier.take() {
990 assert_eq!(command_prev_epoch, Some(prev_epoch));
991 self.committed_epoch = Some(prev_epoch);
992 } else {
993 assert_eq!(command_prev_epoch, None);
994 };
995 for (table_id, epoch) in creating_job_epochs {
996 self.creating_streaming_job_controls
997 .get_mut(&table_id)
998 .expect("should exist")
999 .ack_completed(epoch)
1000 }
1001 }
1002 }
1003}
1004
1005struct EpochNode {
1007 enqueue_time: HistogramTimer,
1009
1010 state: BarrierEpochState,
1012
1013 command_ctx: CommandContext,
1015 notifiers: Vec<Notifier>,
1017}
1018
1019#[derive(Debug)]
1020struct BarrierEpochState {
1022 node_to_collect: NodeToCollect,
1023
1024 resps: Vec<BarrierCompleteResponse>,
1025
1026 creating_jobs_to_wait: HashSet<TableId>,
1027
1028 finished_jobs: HashMap<TableId, (Vec<BarrierCompleteResponse>, Option<SourceChange>)>,
1029}
1030
1031impl BarrierEpochState {
1032 fn is_inflight(&self) -> bool {
1033 !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
1034 }
1035}
1036
1037impl DatabaseCheckpointControl {
1038 fn handle_new_barrier(
1040 &mut self,
1041 command: Option<(Command, Vec<Notifier>)>,
1042 checkpoint: bool,
1043 span: tracing::Span,
1044 control_stream_manager: &mut ControlStreamManager,
1045 hummock_version_stats: &HummockVersionStats,
1046 ) -> MetaResult<()> {
1047 let curr_epoch = self.state.in_flight_prev_epoch().next();
1048
1049 let (mut command, mut notifiers) = if let Some((command, notifiers)) = command {
1050 (Some(command), notifiers)
1051 } else {
1052 (None, vec![])
1053 };
1054
1055 for job_to_cancel in command
1056 .as_ref()
1057 .map(Command::jobs_to_drop)
1058 .into_iter()
1059 .flatten()
1060 {
1061 if self
1062 .creating_streaming_job_controls
1063 .contains_key(&job_to_cancel)
1064 {
1065 warn!(
1066 job_id = job_to_cancel.table_id,
1067 "ignore cancel command on creating streaming job"
1068 );
1069 for notifier in notifiers {
1070 notifier
1071 .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
1072 }
1073 return Ok(());
1074 }
1075 }
1076
1077 if let Some(Command::RescheduleFragment { .. }) = &command
1078 && !self.creating_streaming_job_controls.is_empty()
1079 {
1080 warn!("ignore reschedule when creating streaming job with snapshot backfill");
1081 for notifier in notifiers {
1082 notifier.notify_start_failed(
1083 anyhow!(
1084 "cannot reschedule when creating streaming job with snapshot backfill",
1085 )
1086 .into(),
1087 );
1088 }
1089 return Ok(());
1090 }
1091
1092 let Some(barrier_info) =
1093 self.state
1094 .next_barrier_info(command.as_ref(), checkpoint, curr_epoch)
1095 else {
1096 for mut notifier in notifiers {
1098 notifier.notify_started();
1099 notifier.notify_collected();
1100 }
1101 return Ok(());
1102 };
1103
1104 let mut edges = self
1105 .state
1106 .inflight_graph_info
1107 .build_edge(command.as_ref(), &*control_stream_manager);
1108
1109 if let Some(Command::CreateStreamingJob {
1111 job_type,
1112 info,
1113 cross_db_snapshot_backfill_info,
1114 }) = &mut command
1115 {
1116 match job_type {
1117 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
1118 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1119 fill_snapshot_backfill_epoch(
1120 &mut fragment.nodes,
1121 None,
1122 cross_db_snapshot_backfill_info,
1123 )?;
1124 }
1125 }
1126 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
1127 if self.state.is_paused() {
1128 warn!("cannot create streaming job with snapshot backfill when paused");
1129 for notifier in notifiers {
1130 notifier.notify_start_failed(
1131 anyhow!("cannot create streaming job with snapshot backfill when paused",)
1132 .into(),
1133 );
1134 }
1135 return Ok(());
1136 }
1137 for snapshot_backfill_epoch in snapshot_backfill_info
1139 .upstream_mv_table_id_to_backfill_epoch
1140 .values_mut()
1141 {
1142 assert_eq!(
1143 snapshot_backfill_epoch.replace(barrier_info.prev_epoch()),
1144 None,
1145 "must not set previously"
1146 );
1147 }
1148 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1149 if let Err(e) = fill_snapshot_backfill_epoch(
1150 &mut fragment.nodes,
1151 Some(snapshot_backfill_info),
1152 cross_db_snapshot_backfill_info,
1153 ) {
1154 warn!(e = %e.as_report(), "failed to fill snapshot backfill epoch");
1155 for notifier in notifiers {
1156 notifier.notify_start_failed(e.clone());
1157 }
1158 return Ok(());
1159 };
1160 }
1161 let job_id = info.stream_job_fragments.stream_job_id();
1162 let snapshot_backfill_upstream_tables = snapshot_backfill_info
1163 .upstream_mv_table_id_to_backfill_epoch
1164 .keys()
1165 .cloned()
1166 .collect();
1167
1168 let job = CreatingStreamingJobControl::new(
1169 info,
1170 snapshot_backfill_upstream_tables,
1171 barrier_info.prev_epoch(),
1172 hummock_version_stats,
1173 control_stream_manager,
1174 edges.as_mut().expect("should exist"),
1175 )?;
1176
1177 self.state.inflight_graph_info.shared_actor_infos.upsert(
1178 self.database_id,
1179 job.graph_info()
1180 .fragment_infos
1181 .values()
1182 .map(|fragment| (fragment, job.job_id)),
1183 );
1184
1185 self.creating_streaming_job_controls.insert(job_id, job);
1186 }
1187 }
1188 }
1189
1190 if let (BarrierKind::Checkpoint(_), None) = (&barrier_info.kind, &command) {
1192 if let Some(jobs_to_merge) = self.jobs_to_merge() {
1193 command = Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge));
1194 } else {
1195 let pending_backfill_nodes =
1196 self.create_mview_tracker.take_pending_backfill_nodes();
1197 if !pending_backfill_nodes.is_empty() {
1198 command = Some(Command::StartFragmentBackfill {
1199 fragment_ids: pending_backfill_nodes,
1200 });
1201 }
1202 }
1203 }
1204
1205 let command = command;
1206
1207 let (
1208 pre_applied_graph_info,
1209 pre_applied_subscription_info,
1210 table_ids_to_commit,
1211 jobs_to_wait,
1212 prev_paused_reason,
1213 ) = self.state.apply_command(command.as_ref());
1214
1215 barrier_info.prev_epoch.span().in_scope(|| {
1217 tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch.value().0, "new barrier enqueued");
1218 });
1219 span.record("epoch", barrier_info.curr_epoch.value().0);
1220
1221 for creating_job in &mut self.creating_streaming_job_controls.values_mut() {
1222 creating_job.on_new_command(control_stream_manager, command.as_ref(), &barrier_info)?;
1223 }
1224
1225 let node_to_collect = match control_stream_manager.inject_command_ctx_barrier(
1226 self.database_id,
1227 command.as_ref(),
1228 &barrier_info,
1229 prev_paused_reason,
1230 &pre_applied_graph_info,
1231 &self.state.inflight_graph_info,
1232 &mut edges,
1233 ) {
1234 Ok(node_to_collect) => node_to_collect,
1235 Err(err) => {
1236 for notifier in notifiers {
1237 notifier.notify_start_failed(err.clone());
1238 }
1239 fail_point!("inject_barrier_err_success");
1240 return Err(err);
1241 }
1242 };
1243
1244 notifiers.iter_mut().for_each(|n| n.notify_started());
1246
1247 let command_ctx = CommandContext::new(
1248 barrier_info,
1249 pre_applied_subscription_info,
1250 table_ids_to_commit,
1251 command,
1252 span,
1253 );
1254
1255 self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1257
1258 Ok(())
1259 }
1260}