1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
26use risingwave_meta_model::WorkerId;
27use risingwave_pb::ddl_service::DdlProgress;
28use risingwave_pb::hummock::HummockVersionStats;
29use risingwave_pb::stream_service::BarrierCompleteResponse;
30use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
31use thiserror_ext::AsReport;
32use tracing::{debug, warn};
33
34use crate::barrier::cdc_progress::CdcTableBackfillTrackerRef;
35use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
36use crate::barrier::checkpoint::recovery::{
37 DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
38 RecoveringStateAction,
39};
40use crate::barrier::checkpoint::state::BarrierWorkerState;
41use crate::barrier::command::CommandContext;
42use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
43use crate::barrier::info::{InflightStreamingJobInfo, SharedActorInfos};
44use crate::barrier::notifier::Notifier;
45use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob};
46use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
47use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
48use crate::barrier::utils::{
49 NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
50};
51use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, InflightSubscriptionInfo};
52use crate::manager::MetaSrvEnv;
53use crate::rpc::metrics::GLOBAL_META_METRICS;
54use crate::stream::fill_snapshot_backfill_epoch;
55use crate::{MetaError, MetaResult};
56
57pub(crate) struct CheckpointControl {
58 pub(crate) env: MetaSrvEnv,
59 pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
60 pub(super) hummock_version_stats: HummockVersionStats,
61}
62
63impl CheckpointControl {
64 pub fn new(env: MetaSrvEnv) -> Self {
65 Self {
66 env,
67 databases: Default::default(),
68 hummock_version_stats: Default::default(),
69 }
70 }
71
72 pub(crate) fn recover(
73 databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
74 failed_databases: HashSet<DatabaseId>,
75 control_stream_manager: &mut ControlStreamManager,
76 hummock_version_stats: HummockVersionStats,
77 env: MetaSrvEnv,
78 ) -> Self {
79 env.shared_actor_infos()
80 .retain_databases(databases.keys().chain(&failed_databases).cloned());
81 Self {
82 env,
83 databases: databases
84 .into_iter()
85 .map(|(database_id, control)| {
86 (
87 database_id,
88 DatabaseCheckpointControlStatus::Running(control),
89 )
90 })
91 .chain(failed_databases.into_iter().map(|database_id| {
92 (
93 database_id,
94 DatabaseCheckpointControlStatus::Recovering(
95 DatabaseRecoveringState::resetting(database_id, control_stream_manager),
96 ),
97 )
98 }))
99 .collect(),
100 hummock_version_stats,
101 }
102 }
103
104 pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
105 self.hummock_version_stats = output.hummock_version_stats;
106 for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
107 self.databases
108 .get_mut(&database_id)
109 .expect("should exist")
110 .expect_running("should have wait for completing command before enter recovery")
111 .ack_completed(command_prev_epoch, creating_job_epochs);
112 }
113 }
114
115 pub(crate) fn next_complete_barrier_task(
116 &mut self,
117 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
118 ) -> Option<CompleteBarrierTask> {
119 let mut task = None;
120 for database in self.databases.values_mut() {
121 let Some(database) = database.running_state_mut() else {
122 continue;
123 };
124 let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
125 database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
126 }
127 task
128 }
129
130 pub(crate) fn barrier_collected(
131 &mut self,
132 resp: BarrierCompleteResponse,
133 periodic_barriers: &mut PeriodicBarriers,
134 ) -> MetaResult<()> {
135 let database_id = DatabaseId::new(resp.database_id);
136 let database_status = self.databases.get_mut(&database_id).expect("should exist");
137 match database_status {
138 DatabaseCheckpointControlStatus::Running(database) => {
139 database.barrier_collected(resp, periodic_barriers)
140 }
141 DatabaseCheckpointControlStatus::Recovering(state) => {
142 state.barrier_collected(database_id, resp);
143 Ok(())
144 }
145 }
146 }
147
148 pub(crate) fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
149 self.databases.values().all(|database| {
150 database
151 .running_state()
152 .map(|database| database.can_inject_barrier(in_flight_barrier_nums))
153 .unwrap_or(true)
154 })
155 }
156
157 pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
158 self.databases.iter().filter_map(|(database_id, database)| {
159 database.running_state().is_none().then_some(*database_id)
160 })
161 }
162
163 pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
164 self.databases.iter().filter_map(|(database_id, database)| {
165 database.running_state().is_some().then_some(*database_id)
166 })
167 }
168
169 pub(crate) fn handle_new_barrier(
171 &mut self,
172 new_barrier: NewBarrier,
173 control_stream_manager: &mut ControlStreamManager,
174 ) -> MetaResult<()> {
175 let NewBarrier {
176 database_id,
177 command,
178 span,
179 checkpoint,
180 } = new_barrier;
181
182 if let Some((mut command, notifiers)) = command {
183 if let &mut Command::CreateStreamingJob {
184 ref mut cross_db_snapshot_backfill_info,
185 ref info,
186 ..
187 } = &mut command
188 {
189 for (table_id, snapshot_epoch) in
190 &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
191 {
192 for database in self.databases.values() {
193 if let Some(database) = database.running_state()
194 && database.state.inflight_graph_info.contains_job(*table_id)
195 {
196 if let Some(committed_epoch) = database.committed_epoch {
197 *snapshot_epoch = Some(committed_epoch);
198 }
199 break;
200 }
201 }
202 if snapshot_epoch.is_none() {
203 let table_id = *table_id;
204 warn!(
205 ?cross_db_snapshot_backfill_info,
206 ?table_id,
207 ?info,
208 "database of cross db upstream table not found"
209 );
210 let err: MetaError =
211 anyhow!("database of cross db upstream table {} not found", table_id)
212 .into();
213 for notifier in notifiers {
214 notifier.notify_start_failed(err.clone());
215 }
216
217 return Ok(());
218 }
219 }
220 }
221
222 let database = match self.databases.entry(database_id) {
223 Entry::Occupied(entry) => entry
224 .into_mut()
225 .expect_running("should not have command when not running"),
226 Entry::Vacant(entry) => match &command {
227 Command::CreateStreamingJob {
228 job_type: CreateStreamingJobType::Normal,
229 ..
230 } => {
231 let new_database = DatabaseCheckpointControl::new(
232 database_id,
233 self.env.shared_actor_infos().clone(),
234 self.env.cdc_table_backfill_tracker(),
235 );
236 control_stream_manager.add_partial_graph(database_id, None);
237 entry
238 .insert(DatabaseCheckpointControlStatus::Running(new_database))
239 .expect_running("just initialized as running")
240 }
241 Command::Flush | Command::Pause | Command::Resume => {
242 for mut notifier in notifiers {
243 notifier.notify_started();
244 notifier.notify_collected();
245 }
246 warn!(?command, "skip command for empty database");
247 return Ok(());
248 }
249 _ => {
250 panic!(
251 "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
252 database_id, command
253 )
254 }
255 },
256 };
257
258 database.handle_new_barrier(
259 Some((command, notifiers)),
260 checkpoint,
261 span.clone(),
262 control_stream_manager,
263 &self.hummock_version_stats,
264 )
265 } else {
266 let database = match self.databases.entry(database_id) {
267 Entry::Occupied(entry) => entry.into_mut(),
268 Entry::Vacant(_) => {
269 return Ok(());
272 }
273 };
274 let Some(database) = database.running_state_mut() else {
275 return Ok(());
277 };
278 database.handle_new_barrier(
279 None,
280 checkpoint,
281 span.clone(),
282 control_stream_manager,
283 &self.hummock_version_stats,
284 )
285 }
286 }
287
288 pub(crate) fn update_barrier_nums_metrics(&self) {
289 self.databases
290 .values()
291 .flat_map(|database| database.running_state())
292 .for_each(|database| database.update_barrier_nums_metrics());
293 }
294
295 pub(crate) fn gen_ddl_progress(&self) -> HashMap<u32, DdlProgress> {
296 let mut progress = HashMap::new();
297 for status in self.databases.values() {
298 let Some(database_checkpoint_control) = status.running_state() else {
299 continue;
300 };
301 progress.extend(
303 database_checkpoint_control
304 .create_mview_tracker
305 .gen_ddl_progress(),
306 );
307 for creating_job in database_checkpoint_control
309 .creating_streaming_job_controls
310 .values()
311 {
312 progress.extend([(
313 creating_job.job_id.table_id,
314 creating_job.gen_ddl_progress(),
315 )]);
316 }
317 }
318 progress
319 }
320
321 pub(crate) fn databases_failed_at_worker_err(
322 &mut self,
323 worker_id: WorkerId,
324 ) -> Vec<DatabaseId> {
325 let mut failed_databases = Vec::new();
326 for (database_id, database_status) in &mut self.databases {
327 let database_checkpoint_control = match database_status {
328 DatabaseCheckpointControlStatus::Running(control) => control,
329 DatabaseCheckpointControlStatus::Recovering(state) => {
330 if !state.is_valid_after_worker_err(worker_id) {
331 failed_databases.push(*database_id);
332 }
333 continue;
334 }
335 };
336
337 if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
338 || database_checkpoint_control
339 .state
340 .inflight_graph_info
341 .contains_worker(worker_id as _)
342 || database_checkpoint_control
343 .creating_streaming_job_controls
344 .values_mut()
345 .any(|job| !job.is_valid_after_worker_err(worker_id))
346 {
347 failed_databases.push(*database_id);
348 }
349 }
350 failed_databases
351 }
352
353 pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
354 for (_, node) in self.databases.values_mut().flat_map(|status| {
355 status
356 .running_state_mut()
357 .map(|database| take(&mut database.command_ctx_queue))
358 .into_iter()
359 .flatten()
360 }) {
361 for notifier in node.notifiers {
362 notifier.notify_collection_failed(err.clone());
363 }
364 node.enqueue_time.observe_duration();
365 }
366 self.databases.values_mut().for_each(|database| {
367 if let Some(database) = database.running_state_mut() {
368 database.create_mview_tracker.abort_all()
369 }
370 });
371 }
372
373 pub(crate) fn inflight_infos(
374 &self,
375 ) -> impl Iterator<
376 Item = (
377 DatabaseId,
378 &InflightSubscriptionInfo,
379 impl Iterator<Item = TableId> + '_,
380 ),
381 > + '_ {
382 self.databases.iter().flat_map(|(database_id, database)| {
383 database
384 .database_state()
385 .map(|(database_state, creating_jobs)| {
386 (
387 *database_id,
388 &database_state.inflight_subscription_info,
389 creating_jobs
390 .values()
391 .filter_map(|job| job.is_consuming().then_some(job.job_id)),
392 )
393 })
394 })
395 }
396}
397
398pub(crate) enum CheckpointControlEvent<'a> {
399 EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
400 EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
401}
402
403impl CheckpointControl {
404 pub(crate) fn on_reset_database_resp(
405 &mut self,
406 worker_id: WorkerId,
407 resp: ResetDatabaseResponse,
408 ) {
409 let database_id = DatabaseId::new(resp.database_id);
410 match self.databases.get_mut(&database_id).expect("should exist") {
411 DatabaseCheckpointControlStatus::Running(_) => {
412 unreachable!("should not receive reset database resp when running")
413 }
414 DatabaseCheckpointControlStatus::Recovering(state) => {
415 state.on_reset_database_resp(worker_id, resp)
416 }
417 }
418 }
419
420 pub(crate) fn next_event(
421 &mut self,
422 ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
423 let mut this = Some(self);
424 poll_fn(move |cx| {
425 let Some(this_mut) = this.as_mut() else {
426 unreachable!("should not be polled after poll ready")
427 };
428 for (&database_id, database_status) in &mut this_mut.databases {
429 match database_status {
430 DatabaseCheckpointControlStatus::Running(_) => {}
431 DatabaseCheckpointControlStatus::Recovering(state) => {
432 let poll_result = state.poll_next_event(cx);
433 if let Poll::Ready(action) = poll_result {
434 let this = this.take().expect("checked Some");
435 return Poll::Ready(match action {
436 RecoveringStateAction::EnterInitializing(reset_workers) => {
437 CheckpointControlEvent::EnteringInitializing(
438 this.new_database_status_action(
439 database_id,
440 EnterInitializing(reset_workers),
441 ),
442 )
443 }
444 RecoveringStateAction::EnterRunning => {
445 CheckpointControlEvent::EnteringRunning(
446 this.new_database_status_action(database_id, EnterRunning),
447 )
448 }
449 });
450 }
451 }
452 }
453 }
454 Poll::Pending
455 })
456 }
457}
458
459pub(crate) enum DatabaseCheckpointControlStatus {
460 Running(DatabaseCheckpointControl),
461 Recovering(DatabaseRecoveringState),
462}
463
464impl DatabaseCheckpointControlStatus {
465 fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
466 match self {
467 DatabaseCheckpointControlStatus::Running(state) => Some(state),
468 DatabaseCheckpointControlStatus::Recovering(_) => None,
469 }
470 }
471
472 fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
473 match self {
474 DatabaseCheckpointControlStatus::Running(state) => Some(state),
475 DatabaseCheckpointControlStatus::Recovering(_) => None,
476 }
477 }
478
479 fn database_state(
480 &self,
481 ) -> Option<(
482 &BarrierWorkerState,
483 &HashMap<TableId, CreatingStreamingJobControl>,
484 )> {
485 match self {
486 DatabaseCheckpointControlStatus::Running(control) => {
487 Some((&control.state, &control.creating_streaming_job_controls))
488 }
489 DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
490 }
491 }
492
493 fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
494 match self {
495 DatabaseCheckpointControlStatus::Running(state) => state,
496 DatabaseCheckpointControlStatus::Recovering(_) => {
497 panic!("should be at running: {}", reason)
498 }
499 }
500 }
501}
502
503struct DatabaseCheckpointControlMetrics {
504 barrier_latency: LabelGuardedHistogram,
505 in_flight_barrier_nums: LabelGuardedIntGauge,
506 all_barrier_nums: LabelGuardedIntGauge,
507}
508
509impl DatabaseCheckpointControlMetrics {
510 fn new(database_id: DatabaseId) -> Self {
511 let database_id_str = database_id.database_id.to_string();
512 let barrier_latency = GLOBAL_META_METRICS
513 .barrier_latency
514 .with_guarded_label_values(&[&database_id_str]);
515 let in_flight_barrier_nums = GLOBAL_META_METRICS
516 .in_flight_barrier_nums
517 .with_guarded_label_values(&[&database_id_str]);
518 let all_barrier_nums = GLOBAL_META_METRICS
519 .all_barrier_nums
520 .with_guarded_label_values(&[&database_id_str]);
521 Self {
522 barrier_latency,
523 in_flight_barrier_nums,
524 all_barrier_nums,
525 }
526 }
527}
528
529pub(crate) struct DatabaseCheckpointControl {
531 database_id: DatabaseId,
532 state: BarrierWorkerState,
533
534 command_ctx_queue: BTreeMap<u64, EpochNode>,
537 completing_barrier: Option<u64>,
540
541 committed_epoch: Option<u64>,
542 creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
543
544 create_mview_tracker: CreateMviewProgressTracker,
545 cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
546
547 metrics: DatabaseCheckpointControlMetrics,
548}
549
550impl DatabaseCheckpointControl {
551 fn new(
552 database_id: DatabaseId,
553 shared_actor_infos: SharedActorInfos,
554 cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
555 ) -> Self {
556 Self {
557 database_id,
558 state: BarrierWorkerState::new(database_id, shared_actor_infos),
559 command_ctx_queue: Default::default(),
560 completing_barrier: None,
561 committed_epoch: None,
562 creating_streaming_job_controls: Default::default(),
563 create_mview_tracker: Default::default(),
564 cdc_table_backfill_tracker,
565 metrics: DatabaseCheckpointControlMetrics::new(database_id),
566 }
567 }
568
569 pub(crate) fn recovery(
570 database_id: DatabaseId,
571 create_mview_tracker: CreateMviewProgressTracker,
572 state: BarrierWorkerState,
573 committed_epoch: u64,
574 creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
575 cdc_table_backfill_tracker: CdcTableBackfillTrackerRef,
576 ) -> Self {
577 Self {
578 database_id,
579 state,
580 command_ctx_queue: Default::default(),
581 completing_barrier: None,
582 committed_epoch: Some(committed_epoch),
583 creating_streaming_job_controls,
584 create_mview_tracker,
585 cdc_table_backfill_tracker,
586 metrics: DatabaseCheckpointControlMetrics::new(database_id),
587 }
588 }
589
590 fn total_command_num(&self) -> usize {
591 self.command_ctx_queue.len()
592 + match &self.completing_barrier {
593 Some(_) => 1,
594 None => 0,
595 }
596 }
597
598 fn update_barrier_nums_metrics(&self) {
600 self.metrics.in_flight_barrier_nums.set(
601 self.command_ctx_queue
602 .values()
603 .filter(|x| x.state.is_inflight())
604 .count() as i64,
605 );
606 self.metrics
607 .all_barrier_nums
608 .set(self.total_command_num() as i64);
609 }
610
611 fn jobs_to_merge(
612 &self,
613 ) -> Option<HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>> {
614 let mut table_ids_to_merge = HashMap::new();
615
616 for (table_id, creating_streaming_job) in &self.creating_streaming_job_controls {
617 if let Some(graph_info) = creating_streaming_job.should_merge_to_upstream() {
618 table_ids_to_merge.insert(
619 *table_id,
620 (
621 creating_streaming_job
622 .snapshot_backfill_upstream_tables
623 .clone(),
624 graph_info.clone(),
625 ),
626 );
627 }
628 }
629 if table_ids_to_merge.is_empty() {
630 None
631 } else {
632 Some(table_ids_to_merge)
633 }
634 }
635
636 fn enqueue_command(
638 &mut self,
639 command_ctx: CommandContext,
640 notifiers: Vec<Notifier>,
641 node_to_collect: NodeToCollect,
642 creating_jobs_to_wait: HashSet<TableId>,
643 ) {
644 let timer = self.metrics.barrier_latency.start_timer();
645
646 if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
647 assert_eq!(
648 command_ctx.barrier_info.prev_epoch.value(),
649 node.command_ctx.barrier_info.curr_epoch.value()
650 );
651 }
652
653 tracing::trace!(
654 prev_epoch = command_ctx.barrier_info.prev_epoch(),
655 ?creating_jobs_to_wait,
656 ?node_to_collect,
657 "enqueue command"
658 );
659 self.command_ctx_queue.insert(
660 command_ctx.barrier_info.prev_epoch(),
661 EpochNode {
662 enqueue_time: timer,
663 state: BarrierEpochState {
664 node_to_collect,
665 resps: vec![],
666 creating_jobs_to_wait,
667 finished_jobs: HashMap::new(),
668 },
669 command_ctx,
670 notifiers,
671 },
672 );
673 }
674
675 fn barrier_collected(
678 &mut self,
679 resp: BarrierCompleteResponse,
680 periodic_barriers: &mut PeriodicBarriers,
681 ) -> MetaResult<()> {
682 let worker_id = resp.worker_id;
683 let prev_epoch = resp.epoch;
684 tracing::trace!(
685 worker_id,
686 prev_epoch,
687 partial_graph_id = resp.partial_graph_id,
688 "barrier collected"
689 );
690 let creating_job_id = from_partial_graph_id(resp.partial_graph_id);
691 match creating_job_id {
692 None => {
693 if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
694 assert!(
695 node.state
696 .node_to_collect
697 .remove(&(worker_id as _))
698 .is_some()
699 );
700 node.state.resps.push(resp);
701 } else {
702 panic!(
703 "collect barrier on non-existing barrier: {}, {}",
704 prev_epoch, worker_id
705 );
706 }
707 }
708 Some(creating_job_id) => {
709 let should_merge_to_upstream = self
710 .creating_streaming_job_controls
711 .get_mut(&creating_job_id)
712 .expect("should exist")
713 .collect(resp);
714 if should_merge_to_upstream {
715 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
716 }
717 }
718 }
719 Ok(())
720 }
721
722 fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
724 self.command_ctx_queue
725 .values()
726 .filter(|x| x.state.is_inflight())
727 .count()
728 < in_flight_barrier_nums
729 }
730
731 pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
733 for epoch_node in self.command_ctx_queue.values_mut() {
734 if !is_valid_after_worker_err(&mut epoch_node.state.node_to_collect, worker_id) {
735 return false;
736 }
737 }
738 true
740 }
741}
742
743impl DatabaseCheckpointControl {
744 fn collect_backfill_pinned_upstream_log_epoch(
746 &self,
747 ) -> HashMap<TableId, (u64, HashSet<TableId>)> {
748 self.creating_streaming_job_controls
749 .iter()
750 .filter_map(|(table_id, creating_job)| {
751 creating_job
752 .pinned_upstream_log_epoch()
753 .map(|progress_epoch| {
754 (
755 *table_id,
756 (
757 progress_epoch,
758 creating_job.snapshot_backfill_upstream_tables.clone(),
759 ),
760 )
761 })
762 })
763 .collect()
764 }
765
766 fn next_complete_barrier_task(
767 &mut self,
768 task: &mut Option<CompleteBarrierTask>,
769 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
770 hummock_version_stats: &HummockVersionStats,
771 ) {
772 let mut creating_jobs_task = vec![];
774 {
775 let mut finished_jobs = Vec::new();
777 let min_upstream_inflight_barrier = self
778 .command_ctx_queue
779 .first_key_value()
780 .map(|(epoch, _)| *epoch);
781 for (table_id, job) in &mut self.creating_streaming_job_controls {
782 if let Some((epoch, resps, status)) =
783 job.start_completing(min_upstream_inflight_barrier)
784 {
785 let is_first_time = match status {
786 CompleteJobType::First => true,
787 CompleteJobType::Normal => false,
788 CompleteJobType::Finished => {
789 finished_jobs.push((*table_id, epoch, resps));
790 continue;
791 }
792 };
793 creating_jobs_task.push((*table_id, epoch, resps, is_first_time));
794 }
795 }
796 if !finished_jobs.is_empty()
797 && let Some((_, control_stream_manager)) = &mut context
798 {
799 control_stream_manager.remove_partial_graph(
800 self.database_id,
801 finished_jobs
802 .iter()
803 .map(|(table_id, _, _)| *table_id)
804 .collect(),
805 );
806 }
807 for (table_id, epoch, resps) in finished_jobs {
808 let epoch_state = &mut self
809 .command_ctx_queue
810 .get_mut(&epoch)
811 .expect("should exist")
812 .state;
813 assert!(epoch_state.creating_jobs_to_wait.remove(&table_id));
814 debug!(epoch, ?table_id, "finish creating job");
815 let creating_streaming_job = self
818 .creating_streaming_job_controls
819 .remove(&table_id)
820 .expect("should exist");
821 assert!(creating_streaming_job.is_finished());
822 assert!(epoch_state.finished_jobs.insert(table_id, resps).is_none());
823 }
824 }
825 assert!(self.completing_barrier.is_none());
826 while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
827 && !state.is_inflight()
828 {
829 {
830 let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
831 assert!(node.state.creating_jobs_to_wait.is_empty());
832 assert!(node.state.node_to_collect.is_empty());
833
834 let load_finished_source_ids: Vec<_> = node
836 .state
837 .resps
838 .iter()
839 .flat_map(|resp| &resp.load_finished_source_ids)
840 .cloned()
841 .collect();
842 if !load_finished_source_ids.is_empty() {
843 let task = task.get_or_insert_default();
845 task.load_finished_source_ids
846 .extend(load_finished_source_ids);
847 }
848
849 let mut finished_jobs = self.create_mview_tracker.apply_collected_command(
850 node.command_ctx.command.as_ref(),
851 &node.command_ctx.barrier_info,
852 &node.state.resps,
853 hummock_version_stats,
854 );
855 let finished_cdc_backfill = self
856 .cdc_table_backfill_tracker
857 .apply_collected_command(&node.state.resps);
858 if !node.command_ctx.barrier_info.kind.is_checkpoint() {
859 assert!(finished_jobs.is_empty());
860 node.notifiers.into_iter().for_each(|notifier| {
861 notifier.notify_collected();
862 });
863 if let Some((periodic_barriers, _)) = &mut context
864 && self.create_mview_tracker.has_pending_finished_jobs()
865 && self
866 .command_ctx_queue
867 .values()
868 .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
869 {
870 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
871 }
872 continue;
873 }
874 node.state
875 .finished_jobs
876 .drain()
877 .for_each(|(job_id, resps)| {
878 node.state.resps.extend(resps);
879 finished_jobs.push(TrackingJob::New(TrackingCommand { job_id }));
880 });
881 let task = task.get_or_insert_default();
882 node.command_ctx.collect_commit_epoch_info(
883 &mut task.commit_info,
884 take(&mut node.state.resps),
885 self.collect_backfill_pinned_upstream_log_epoch(),
886 );
887 self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
888 task.finished_jobs.extend(finished_jobs);
889 task.finished_cdc_table_backfill
890 .extend(finished_cdc_backfill);
891 task.notifiers.extend(node.notifiers);
892 task.epoch_infos
893 .try_insert(
894 self.database_id,
895 (Some((node.command_ctx, node.enqueue_time)), vec![]),
896 )
897 .expect("non duplicate");
898 break;
899 }
900 }
901 if !creating_jobs_task.is_empty() {
902 let task = task.get_or_insert_default();
903 for (table_id, epoch, resps, is_first_time) in creating_jobs_task {
904 collect_creating_job_commit_epoch_info(
905 &mut task.commit_info,
906 epoch,
907 resps,
908 self.creating_streaming_job_controls[&table_id].state_table_ids(),
909 is_first_time,
910 );
911 let (_, creating_job_epochs) =
912 task.epoch_infos.entry(self.database_id).or_default();
913 creating_job_epochs.push((table_id, epoch));
914 }
915 }
916 }
917
918 fn ack_completed(
919 &mut self,
920 command_prev_epoch: Option<u64>,
921 creating_job_epochs: Vec<(TableId, u64)>,
922 ) {
923 {
924 if let Some(prev_epoch) = self.completing_barrier.take() {
925 assert_eq!(command_prev_epoch, Some(prev_epoch));
926 self.committed_epoch = Some(prev_epoch);
927 } else {
928 assert_eq!(command_prev_epoch, None);
929 };
930 for (table_id, epoch) in creating_job_epochs {
931 self.creating_streaming_job_controls
932 .get_mut(&table_id)
933 .expect("should exist")
934 .ack_completed(epoch)
935 }
936 }
937 }
938}
939
940struct EpochNode {
942 enqueue_time: HistogramTimer,
944
945 state: BarrierEpochState,
947
948 command_ctx: CommandContext,
950 notifiers: Vec<Notifier>,
952}
953
954#[derive(Debug)]
955struct BarrierEpochState {
957 node_to_collect: NodeToCollect,
958
959 resps: Vec<BarrierCompleteResponse>,
960
961 creating_jobs_to_wait: HashSet<TableId>,
962
963 finished_jobs: HashMap<TableId, Vec<BarrierCompleteResponse>>,
964}
965
966impl BarrierEpochState {
967 fn is_inflight(&self) -> bool {
968 !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
969 }
970}
971
972impl DatabaseCheckpointControl {
973 fn handle_new_barrier(
975 &mut self,
976 command: Option<(Command, Vec<Notifier>)>,
977 checkpoint: bool,
978 span: tracing::Span,
979 control_stream_manager: &mut ControlStreamManager,
980 hummock_version_stats: &HummockVersionStats,
981 ) -> MetaResult<()> {
982 let curr_epoch = self.state.in_flight_prev_epoch().next();
983
984 let (mut command, mut notifiers) = if let Some((command, notifiers)) = command {
985 (Some(command), notifiers)
986 } else {
987 (None, vec![])
988 };
989
990 for job_to_cancel in command
991 .as_ref()
992 .map(Command::jobs_to_drop)
993 .into_iter()
994 .flatten()
995 {
996 if self
997 .creating_streaming_job_controls
998 .contains_key(&job_to_cancel)
999 {
1000 warn!(
1001 job_id = job_to_cancel.table_id,
1002 "ignore cancel command on creating streaming job"
1003 );
1004 for notifier in notifiers {
1005 notifier
1006 .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
1007 }
1008 return Ok(());
1009 }
1010 }
1011
1012 if let Some(Command::RescheduleFragment { .. }) = &command
1013 && !self.creating_streaming_job_controls.is_empty()
1014 {
1015 warn!("ignore reschedule when creating streaming job with snapshot backfill");
1016 for notifier in notifiers {
1017 notifier.notify_start_failed(
1018 anyhow!(
1019 "cannot reschedule when creating streaming job with snapshot backfill",
1020 )
1021 .into(),
1022 );
1023 }
1024 return Ok(());
1025 }
1026
1027 let Some(barrier_info) =
1028 self.state
1029 .next_barrier_info(command.as_ref(), checkpoint, curr_epoch)
1030 else {
1031 for mut notifier in notifiers {
1033 notifier.notify_started();
1034 notifier.notify_collected();
1035 }
1036 return Ok(());
1037 };
1038
1039 let mut edges = self
1040 .state
1041 .inflight_graph_info
1042 .build_edge(command.as_ref(), &*control_stream_manager);
1043
1044 if let Some(Command::CreateStreamingJob {
1046 job_type,
1047 info,
1048 cross_db_snapshot_backfill_info,
1049 }) = &mut command
1050 {
1051 match job_type {
1052 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
1053 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1054 fill_snapshot_backfill_epoch(
1055 &mut fragment.nodes,
1056 None,
1057 cross_db_snapshot_backfill_info,
1058 )?;
1059 }
1060 }
1061 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
1062 if self.state.is_paused() {
1063 warn!("cannot create streaming job with snapshot backfill when paused");
1064 for notifier in notifiers {
1065 notifier.notify_start_failed(
1066 anyhow!("cannot create streaming job with snapshot backfill when paused",)
1067 .into(),
1068 );
1069 }
1070 return Ok(());
1071 }
1072 for snapshot_backfill_epoch in snapshot_backfill_info
1074 .upstream_mv_table_id_to_backfill_epoch
1075 .values_mut()
1076 {
1077 assert_eq!(
1078 snapshot_backfill_epoch.replace(barrier_info.prev_epoch()),
1079 None,
1080 "must not set previously"
1081 );
1082 }
1083 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1084 if let Err(e) = fill_snapshot_backfill_epoch(
1085 &mut fragment.nodes,
1086 Some(snapshot_backfill_info),
1087 cross_db_snapshot_backfill_info,
1088 ) {
1089 warn!(e = %e.as_report(), "failed to fill snapshot backfill epoch");
1090 for notifier in notifiers {
1091 notifier.notify_start_failed(e.clone());
1092 }
1093 return Ok(());
1094 };
1095 }
1096 let job_id = info.stream_job_fragments.stream_job_id();
1097 let snapshot_backfill_upstream_tables = snapshot_backfill_info
1098 .upstream_mv_table_id_to_backfill_epoch
1099 .keys()
1100 .cloned()
1101 .collect();
1102
1103 let job = CreatingStreamingJobControl::new(
1104 info,
1105 snapshot_backfill_upstream_tables,
1106 barrier_info.prev_epoch(),
1107 hummock_version_stats,
1108 control_stream_manager,
1109 edges.as_mut().expect("should exist"),
1110 )?;
1111
1112 self.state
1113 .inflight_graph_info
1114 .shared_actor_infos
1115 .upsert(self.database_id, job.graph_info().fragment_infos.values());
1116
1117 self.creating_streaming_job_controls.insert(job_id, job);
1118 }
1119 }
1120 }
1121
1122 if let (BarrierKind::Checkpoint(_), None) = (&barrier_info.kind, &command) {
1124 if let Some(jobs_to_merge) = self.jobs_to_merge() {
1125 command = Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge));
1126 } else {
1127 let pending_backfill_nodes =
1128 self.create_mview_tracker.take_pending_backfill_nodes();
1129 if !pending_backfill_nodes.is_empty() {
1130 command = Some(Command::StartFragmentBackfill {
1131 fragment_ids: pending_backfill_nodes,
1132 });
1133 }
1134 }
1135 }
1136
1137 let command = command;
1138
1139 let (
1140 pre_applied_graph_info,
1141 pre_applied_subscription_info,
1142 table_ids_to_commit,
1143 jobs_to_wait,
1144 prev_paused_reason,
1145 ) = self.state.apply_command(command.as_ref());
1146
1147 barrier_info.prev_epoch.span().in_scope(|| {
1149 tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch.value().0, "new barrier enqueued");
1150 });
1151 span.record("epoch", barrier_info.curr_epoch.value().0);
1152
1153 for creating_job in &mut self.creating_streaming_job_controls.values_mut() {
1154 creating_job.on_new_command(control_stream_manager, command.as_ref(), &barrier_info)?;
1155 }
1156
1157 let node_to_collect = match control_stream_manager.inject_command_ctx_barrier(
1158 self.database_id,
1159 command.as_ref(),
1160 &barrier_info,
1161 prev_paused_reason,
1162 &pre_applied_graph_info,
1163 &self.state.inflight_graph_info,
1164 &mut edges,
1165 ) {
1166 Ok(node_to_collect) => node_to_collect,
1167 Err(err) => {
1168 for notifier in notifiers {
1169 notifier.notify_start_failed(err.clone());
1170 }
1171 fail_point!("inject_barrier_err_success");
1172 return Err(err);
1173 }
1174 };
1175
1176 notifiers.iter_mut().for_each(|n| n.notify_started());
1178
1179 let command_ctx = CommandContext::new(
1180 barrier_info,
1181 pre_applied_subscription_info,
1182 table_ids_to_commit.clone(),
1183 command,
1184 span,
1185 );
1186
1187 self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1189
1190 Ok(())
1191 }
1192}