1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
26use risingwave_meta_model::WorkerId;
27use risingwave_pb::ddl_service::DdlProgress;
28use risingwave_pb::hummock::HummockVersionStats;
29use risingwave_pb::stream_service::BarrierCompleteResponse;
30use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
31use thiserror_ext::AsReport;
32use tracing::{debug, warn};
33
34use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
35use crate::barrier::checkpoint::recovery::{
36 DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
37 RecoveringStateAction,
38};
39use crate::barrier::checkpoint::state::BarrierWorkerState;
40use crate::barrier::command::CommandContext;
41use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
42use crate::barrier::info::InflightStreamingJobInfo;
43use crate::barrier::notifier::Notifier;
44use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob};
45use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
46use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
47use crate::barrier::utils::{
48 NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
49};
50use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, InflightSubscriptionInfo};
51use crate::manager::MetaSrvEnv;
52use crate::rpc::metrics::GLOBAL_META_METRICS;
53use crate::stream::fill_snapshot_backfill_epoch;
54use crate::{MetaError, MetaResult};
55
56pub(crate) struct CheckpointControl {
57 pub(crate) env: MetaSrvEnv,
58 pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
59 pub(super) hummock_version_stats: HummockVersionStats,
60}
61
62impl CheckpointControl {
63 pub fn new(env: MetaSrvEnv) -> Self {
64 Self {
65 env,
66 databases: Default::default(),
67 hummock_version_stats: Default::default(),
68 }
69 }
70
71 pub(crate) fn recover(
72 databases: impl IntoIterator<Item = (DatabaseId, DatabaseCheckpointControl)>,
73 failed_databases: HashSet<DatabaseId>,
74 control_stream_manager: &mut ControlStreamManager,
75 hummock_version_stats: HummockVersionStats,
76 env: MetaSrvEnv,
77 ) -> Self {
78 Self {
79 env,
80 databases: databases
81 .into_iter()
82 .map(|(database_id, control)| {
83 (
84 database_id,
85 DatabaseCheckpointControlStatus::Running(control),
86 )
87 })
88 .chain(failed_databases.into_iter().map(|database_id| {
89 (
90 database_id,
91 DatabaseCheckpointControlStatus::Recovering(
92 DatabaseRecoveringState::resetting(database_id, control_stream_manager),
93 ),
94 )
95 }))
96 .collect(),
97 hummock_version_stats,
98 }
99 }
100
101 pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
102 self.hummock_version_stats = output.hummock_version_stats;
103 for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
104 self.databases
105 .get_mut(&database_id)
106 .expect("should exist")
107 .expect_running("should have wait for completing command before enter recovery")
108 .ack_completed(command_prev_epoch, creating_job_epochs);
109 }
110 }
111
112 pub(crate) fn next_complete_barrier_task(
113 &mut self,
114 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
115 ) -> Option<CompleteBarrierTask> {
116 let mut task = None;
117 for database in self.databases.values_mut() {
118 let Some(database) = database.running_state_mut() else {
119 continue;
120 };
121 let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
122 database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
123 }
124 task
125 }
126
127 pub(crate) fn barrier_collected(
128 &mut self,
129 resp: BarrierCompleteResponse,
130 periodic_barriers: &mut PeriodicBarriers,
131 ) -> MetaResult<()> {
132 let database_id = DatabaseId::new(resp.database_id);
133 let database_status = self.databases.get_mut(&database_id).expect("should exist");
134 match database_status {
135 DatabaseCheckpointControlStatus::Running(database) => {
136 database.barrier_collected(resp, periodic_barriers)
137 }
138 DatabaseCheckpointControlStatus::Recovering(state) => {
139 state.barrier_collected(database_id, resp);
140 Ok(())
141 }
142 }
143 }
144
145 pub(crate) fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
146 self.databases.values().all(|database| {
147 database
148 .running_state()
149 .map(|database| database.can_inject_barrier(in_flight_barrier_nums))
150 .unwrap_or(true)
151 })
152 }
153
154 pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
155 self.databases.iter().filter_map(|(database_id, database)| {
156 database.running_state().is_none().then_some(*database_id)
157 })
158 }
159
160 pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
161 self.databases.iter().filter_map(|(database_id, database)| {
162 database.running_state().is_some().then_some(*database_id)
163 })
164 }
165
166 pub(crate) fn handle_new_barrier(
168 &mut self,
169 new_barrier: NewBarrier,
170 control_stream_manager: &mut ControlStreamManager,
171 ) -> MetaResult<()> {
172 let NewBarrier {
173 database_id,
174 command,
175 span,
176 checkpoint,
177 } = new_barrier;
178
179 if let Some((mut command, notifiers)) = command {
180 if let &mut Command::CreateStreamingJob {
181 ref mut cross_db_snapshot_backfill_info,
182 ref info,
183 ..
184 } = &mut command
185 {
186 for (table_id, snapshot_epoch) in
187 &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
188 {
189 for database in self.databases.values() {
190 if let Some(database) = database.running_state()
191 && database.state.inflight_graph_info.contains_job(*table_id)
192 {
193 if let Some(committed_epoch) = database.committed_epoch {
194 *snapshot_epoch = Some(committed_epoch);
195 }
196 break;
197 }
198 }
199 if snapshot_epoch.is_none() {
200 let table_id = *table_id;
201 warn!(
202 ?cross_db_snapshot_backfill_info,
203 ?table_id,
204 ?info,
205 "database of cross db upstream table not found"
206 );
207 let err: MetaError =
208 anyhow!("database of cross db upstream table {} not found", table_id)
209 .into();
210 for notifier in notifiers {
211 notifier.notify_start_failed(err.clone());
212 }
213
214 return Ok(());
215 }
216 }
217 }
218
219 let database = match self.databases.entry(database_id) {
220 Entry::Occupied(entry) => entry
221 .into_mut()
222 .expect_running("should not have command when not running"),
223 Entry::Vacant(entry) => match &command {
224 Command::CreateStreamingJob {
225 job_type: CreateStreamingJobType::Normal,
226 ..
227 } => {
228 let new_database = DatabaseCheckpointControl::new(database_id);
229 control_stream_manager.add_partial_graph(database_id, None);
230 entry
231 .insert(DatabaseCheckpointControlStatus::Running(new_database))
232 .expect_running("just initialized as running")
233 }
234 Command::Flush | Command::Pause | Command::Resume => {
235 for mut notifier in notifiers {
236 notifier.notify_started();
237 notifier.notify_collected();
238 }
239 warn!(?command, "skip command for empty database");
240 return Ok(());
241 }
242 _ => {
243 panic!(
244 "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
245 database_id, command
246 )
247 }
248 },
249 };
250
251 database.handle_new_barrier(
252 Some((command, notifiers)),
253 checkpoint,
254 span.clone(),
255 control_stream_manager,
256 &self.hummock_version_stats,
257 )
258 } else {
259 let database = match self.databases.entry(database_id) {
260 Entry::Occupied(entry) => entry.into_mut(),
261 Entry::Vacant(_) => {
262 return Ok(());
265 }
266 };
267 let Some(database) = database.running_state_mut() else {
268 return Ok(());
270 };
271 database.handle_new_barrier(
272 None,
273 checkpoint,
274 span.clone(),
275 control_stream_manager,
276 &self.hummock_version_stats,
277 )
278 }
279 }
280
281 pub(crate) fn update_barrier_nums_metrics(&self) {
282 self.databases
283 .values()
284 .flat_map(|database| database.running_state())
285 .for_each(|database| database.update_barrier_nums_metrics());
286 }
287
288 pub(crate) fn gen_ddl_progress(&self) -> HashMap<u32, DdlProgress> {
289 let mut progress = HashMap::new();
290 for status in self.databases.values() {
291 let Some(database_checkpoint_control) = status.running_state() else {
292 continue;
293 };
294 progress.extend(
296 database_checkpoint_control
297 .create_mview_tracker
298 .gen_ddl_progress(),
299 );
300 for creating_job in database_checkpoint_control
302 .creating_streaming_job_controls
303 .values()
304 {
305 progress.extend([(
306 creating_job.job_id.table_id,
307 creating_job.gen_ddl_progress(),
308 )]);
309 }
310 }
311 progress
312 }
313
314 pub(crate) fn databases_failed_at_worker_err(
315 &mut self,
316 worker_id: WorkerId,
317 ) -> Vec<DatabaseId> {
318 let mut failed_databases = Vec::new();
319 for (database_id, database_status) in &mut self.databases {
320 let database_checkpoint_control = match database_status {
321 DatabaseCheckpointControlStatus::Running(control) => control,
322 DatabaseCheckpointControlStatus::Recovering(state) => {
323 if !state.is_valid_after_worker_err(worker_id) {
324 failed_databases.push(*database_id);
325 }
326 continue;
327 }
328 };
329
330 if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
331 || database_checkpoint_control
332 .state
333 .inflight_graph_info
334 .contains_worker(worker_id as _)
335 || database_checkpoint_control
336 .creating_streaming_job_controls
337 .values_mut()
338 .any(|job| !job.is_valid_after_worker_err(worker_id))
339 {
340 failed_databases.push(*database_id);
341 }
342 }
343 failed_databases
344 }
345
346 pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
347 for (_, node) in self.databases.values_mut().flat_map(|status| {
348 status
349 .running_state_mut()
350 .map(|database| take(&mut database.command_ctx_queue))
351 .into_iter()
352 .flatten()
353 }) {
354 for notifier in node.notifiers {
355 notifier.notify_collection_failed(err.clone());
356 }
357 node.enqueue_time.observe_duration();
358 }
359 self.databases.values_mut().for_each(|database| {
360 if let Some(database) = database.running_state_mut() {
361 database.create_mview_tracker.abort_all()
362 }
363 });
364 }
365
366 pub(crate) fn inflight_infos(
367 &self,
368 ) -> impl Iterator<
369 Item = (
370 DatabaseId,
371 &InflightSubscriptionInfo,
372 impl Iterator<Item = TableId> + '_,
373 ),
374 > + '_ {
375 self.databases.iter().flat_map(|(database_id, database)| {
376 database
377 .database_state()
378 .map(|(database_state, creating_jobs)| {
379 (
380 *database_id,
381 &database_state.inflight_subscription_info,
382 creating_jobs
383 .values()
384 .filter_map(|job| job.is_consuming().then_some(job.job_id)),
385 )
386 })
387 })
388 }
389}
390
391pub(crate) enum CheckpointControlEvent<'a> {
392 EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
393 EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
394}
395
396impl CheckpointControl {
397 pub(crate) fn on_reset_database_resp(
398 &mut self,
399 worker_id: WorkerId,
400 resp: ResetDatabaseResponse,
401 ) {
402 let database_id = DatabaseId::new(resp.database_id);
403 match self.databases.get_mut(&database_id).expect("should exist") {
404 DatabaseCheckpointControlStatus::Running(_) => {
405 unreachable!("should not receive reset database resp when running")
406 }
407 DatabaseCheckpointControlStatus::Recovering(state) => {
408 state.on_reset_database_resp(worker_id, resp)
409 }
410 }
411 }
412
413 pub(crate) fn next_event(
414 &mut self,
415 ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
416 let mut this = Some(self);
417 poll_fn(move |cx| {
418 let Some(this_mut) = this.as_mut() else {
419 unreachable!("should not be polled after poll ready")
420 };
421 for (&database_id, database_status) in &mut this_mut.databases {
422 match database_status {
423 DatabaseCheckpointControlStatus::Running(_) => {}
424 DatabaseCheckpointControlStatus::Recovering(state) => {
425 let poll_result = state.poll_next_event(cx);
426 if let Poll::Ready(action) = poll_result {
427 let this = this.take().expect("checked Some");
428 return Poll::Ready(match action {
429 RecoveringStateAction::EnterInitializing(reset_workers) => {
430 CheckpointControlEvent::EnteringInitializing(
431 this.new_database_status_action(
432 database_id,
433 EnterInitializing(reset_workers),
434 ),
435 )
436 }
437 RecoveringStateAction::EnterRunning => {
438 CheckpointControlEvent::EnteringRunning(
439 this.new_database_status_action(database_id, EnterRunning),
440 )
441 }
442 });
443 }
444 }
445 }
446 }
447 Poll::Pending
448 })
449 }
450}
451
452pub(crate) enum DatabaseCheckpointControlStatus {
453 Running(DatabaseCheckpointControl),
454 Recovering(DatabaseRecoveringState),
455}
456
457impl DatabaseCheckpointControlStatus {
458 fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
459 match self {
460 DatabaseCheckpointControlStatus::Running(state) => Some(state),
461 DatabaseCheckpointControlStatus::Recovering(_) => None,
462 }
463 }
464
465 fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
466 match self {
467 DatabaseCheckpointControlStatus::Running(state) => Some(state),
468 DatabaseCheckpointControlStatus::Recovering(_) => None,
469 }
470 }
471
472 fn database_state(
473 &self,
474 ) -> Option<(
475 &BarrierWorkerState,
476 &HashMap<TableId, CreatingStreamingJobControl>,
477 )> {
478 match self {
479 DatabaseCheckpointControlStatus::Running(control) => {
480 Some((&control.state, &control.creating_streaming_job_controls))
481 }
482 DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
483 }
484 }
485
486 fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
487 match self {
488 DatabaseCheckpointControlStatus::Running(state) => state,
489 DatabaseCheckpointControlStatus::Recovering(_) => {
490 panic!("should be at running: {}", reason)
491 }
492 }
493 }
494}
495
496struct DatabaseCheckpointControlMetrics {
497 barrier_latency: LabelGuardedHistogram,
498 in_flight_barrier_nums: LabelGuardedIntGauge,
499 all_barrier_nums: LabelGuardedIntGauge,
500}
501
502impl DatabaseCheckpointControlMetrics {
503 fn new(database_id: DatabaseId) -> Self {
504 let database_id_str = database_id.database_id.to_string();
505 let barrier_latency = GLOBAL_META_METRICS
506 .barrier_latency
507 .with_guarded_label_values(&[&database_id_str]);
508 let in_flight_barrier_nums = GLOBAL_META_METRICS
509 .in_flight_barrier_nums
510 .with_guarded_label_values(&[&database_id_str]);
511 let all_barrier_nums = GLOBAL_META_METRICS
512 .all_barrier_nums
513 .with_guarded_label_values(&[&database_id_str]);
514 Self {
515 barrier_latency,
516 in_flight_barrier_nums,
517 all_barrier_nums,
518 }
519 }
520}
521
522pub(crate) struct DatabaseCheckpointControl {
524 database_id: DatabaseId,
525 state: BarrierWorkerState,
526
527 command_ctx_queue: BTreeMap<u64, EpochNode>,
530 completing_barrier: Option<u64>,
533
534 committed_epoch: Option<u64>,
535 creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
536
537 create_mview_tracker: CreateMviewProgressTracker,
538
539 metrics: DatabaseCheckpointControlMetrics,
540}
541
542impl DatabaseCheckpointControl {
543 fn new(database_id: DatabaseId) -> Self {
544 Self {
545 database_id,
546 state: BarrierWorkerState::new(),
547 command_ctx_queue: Default::default(),
548 completing_barrier: None,
549 committed_epoch: None,
550 creating_streaming_job_controls: Default::default(),
551 create_mview_tracker: Default::default(),
552 metrics: DatabaseCheckpointControlMetrics::new(database_id),
553 }
554 }
555
556 pub(crate) fn recovery(
557 database_id: DatabaseId,
558 create_mview_tracker: CreateMviewProgressTracker,
559 state: BarrierWorkerState,
560 committed_epoch: u64,
561 creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
562 ) -> Self {
563 Self {
564 database_id,
565 state,
566 command_ctx_queue: Default::default(),
567 completing_barrier: None,
568 committed_epoch: Some(committed_epoch),
569 creating_streaming_job_controls,
570 create_mview_tracker,
571 metrics: DatabaseCheckpointControlMetrics::new(database_id),
572 }
573 }
574
575 fn total_command_num(&self) -> usize {
576 self.command_ctx_queue.len()
577 + match &self.completing_barrier {
578 Some(_) => 1,
579 None => 0,
580 }
581 }
582
583 fn update_barrier_nums_metrics(&self) {
585 self.metrics.in_flight_barrier_nums.set(
586 self.command_ctx_queue
587 .values()
588 .filter(|x| x.state.is_inflight())
589 .count() as i64,
590 );
591 self.metrics
592 .all_barrier_nums
593 .set(self.total_command_num() as i64);
594 }
595
596 fn jobs_to_merge(
597 &self,
598 ) -> Option<HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>> {
599 let mut table_ids_to_merge = HashMap::new();
600
601 for (table_id, creating_streaming_job) in &self.creating_streaming_job_controls {
602 if let Some(graph_info) = creating_streaming_job.should_merge_to_upstream() {
603 table_ids_to_merge.insert(
604 *table_id,
605 (
606 creating_streaming_job
607 .snapshot_backfill_upstream_tables
608 .clone(),
609 graph_info.clone(),
610 ),
611 );
612 }
613 }
614 if table_ids_to_merge.is_empty() {
615 None
616 } else {
617 Some(table_ids_to_merge)
618 }
619 }
620
621 fn enqueue_command(
623 &mut self,
624 command_ctx: CommandContext,
625 notifiers: Vec<Notifier>,
626 node_to_collect: NodeToCollect,
627 creating_jobs_to_wait: HashSet<TableId>,
628 ) {
629 let timer = self.metrics.barrier_latency.start_timer();
630
631 if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
632 assert_eq!(
633 command_ctx.barrier_info.prev_epoch.value(),
634 node.command_ctx.barrier_info.curr_epoch.value()
635 );
636 }
637
638 tracing::trace!(
639 prev_epoch = command_ctx.barrier_info.prev_epoch(),
640 ?creating_jobs_to_wait,
641 ?node_to_collect,
642 "enqueue command"
643 );
644 self.command_ctx_queue.insert(
645 command_ctx.barrier_info.prev_epoch(),
646 EpochNode {
647 enqueue_time: timer,
648 state: BarrierEpochState {
649 node_to_collect,
650 resps: vec![],
651 creating_jobs_to_wait,
652 finished_jobs: HashMap::new(),
653 },
654 command_ctx,
655 notifiers,
656 },
657 );
658 }
659
660 fn barrier_collected(
663 &mut self,
664 resp: BarrierCompleteResponse,
665 periodic_barriers: &mut PeriodicBarriers,
666 ) -> MetaResult<()> {
667 let worker_id = resp.worker_id;
668 let prev_epoch = resp.epoch;
669 tracing::trace!(
670 worker_id,
671 prev_epoch,
672 partial_graph_id = resp.partial_graph_id,
673 "barrier collected"
674 );
675 let creating_job_id = from_partial_graph_id(resp.partial_graph_id);
676 match creating_job_id {
677 None => {
678 if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
679 assert!(
680 node.state
681 .node_to_collect
682 .remove(&(worker_id as _))
683 .is_some()
684 );
685 node.state.resps.push(resp);
686 } else {
687 panic!(
688 "collect barrier on non-existing barrier: {}, {}",
689 prev_epoch, worker_id
690 );
691 }
692 }
693 Some(creating_job_id) => {
694 let should_merge_to_upstream = self
695 .creating_streaming_job_controls
696 .get_mut(&creating_job_id)
697 .expect("should exist")
698 .collect(resp);
699 if should_merge_to_upstream {
700 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
701 }
702 }
703 }
704 Ok(())
705 }
706
707 fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
709 self.command_ctx_queue
710 .values()
711 .filter(|x| x.state.is_inflight())
712 .count()
713 < in_flight_barrier_nums
714 }
715
716 pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
718 for epoch_node in self.command_ctx_queue.values_mut() {
719 if !is_valid_after_worker_err(&mut epoch_node.state.node_to_collect, worker_id) {
720 return false;
721 }
722 }
723 true
725 }
726}
727
728impl DatabaseCheckpointControl {
729 fn collect_backfill_pinned_upstream_log_epoch(
731 &self,
732 ) -> HashMap<TableId, (u64, HashSet<TableId>)> {
733 self.creating_streaming_job_controls
734 .iter()
735 .filter_map(|(table_id, creating_job)| {
736 creating_job
737 .pinned_upstream_log_epoch()
738 .map(|progress_epoch| {
739 (
740 *table_id,
741 (
742 progress_epoch,
743 creating_job.snapshot_backfill_upstream_tables.clone(),
744 ),
745 )
746 })
747 })
748 .collect()
749 }
750
751 fn next_complete_barrier_task(
752 &mut self,
753 task: &mut Option<CompleteBarrierTask>,
754 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
755 hummock_version_stats: &HummockVersionStats,
756 ) {
757 let mut creating_jobs_task = vec![];
759 {
760 let mut finished_jobs = Vec::new();
762 let min_upstream_inflight_barrier = self
763 .command_ctx_queue
764 .first_key_value()
765 .map(|(epoch, _)| *epoch);
766 for (table_id, job) in &mut self.creating_streaming_job_controls {
767 if let Some((epoch, resps, status)) =
768 job.start_completing(min_upstream_inflight_barrier)
769 {
770 let is_first_time = match status {
771 CompleteJobType::First => true,
772 CompleteJobType::Normal => false,
773 CompleteJobType::Finished => {
774 finished_jobs.push((*table_id, epoch, resps));
775 continue;
776 }
777 };
778 creating_jobs_task.push((*table_id, epoch, resps, is_first_time));
779 }
780 }
781 if !finished_jobs.is_empty()
782 && let Some((_, control_stream_manager)) = &mut context
783 {
784 control_stream_manager.remove_partial_graph(
785 self.database_id,
786 finished_jobs
787 .iter()
788 .map(|(table_id, _, _)| *table_id)
789 .collect(),
790 );
791 }
792 for (table_id, epoch, resps) in finished_jobs {
793 let epoch_state = &mut self
794 .command_ctx_queue
795 .get_mut(&epoch)
796 .expect("should exist")
797 .state;
798 assert!(epoch_state.creating_jobs_to_wait.remove(&table_id));
799 debug!(epoch, ?table_id, "finish creating job");
800 let creating_streaming_job = self
803 .creating_streaming_job_controls
804 .remove(&table_id)
805 .expect("should exist");
806 assert!(creating_streaming_job.is_finished());
807 assert!(epoch_state.finished_jobs.insert(table_id, resps).is_none());
808 }
809 }
810 assert!(self.completing_barrier.is_none());
811 while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
812 && !state.is_inflight()
813 {
814 {
815 let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
816 assert!(node.state.creating_jobs_to_wait.is_empty());
817 assert!(node.state.node_to_collect.is_empty());
818 let mut finished_jobs = self.create_mview_tracker.apply_collected_command(
819 node.command_ctx.command.as_ref(),
820 &node.command_ctx.barrier_info,
821 &node.state.resps,
822 hummock_version_stats,
823 );
824 if !node.command_ctx.barrier_info.kind.is_checkpoint() {
825 assert!(finished_jobs.is_empty());
826 node.notifiers.into_iter().for_each(|notifier| {
827 notifier.notify_collected();
828 });
829 if let Some((periodic_barriers, _)) = &mut context
830 && self.create_mview_tracker.has_pending_finished_jobs()
831 && self
832 .command_ctx_queue
833 .values()
834 .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
835 {
836 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
837 }
838 continue;
839 }
840 node.state
841 .finished_jobs
842 .drain()
843 .for_each(|(job_id, resps)| {
844 node.state.resps.extend(resps);
845 finished_jobs.push(TrackingJob::New(TrackingCommand {
846 job_id,
847 replace_stream_job: None,
848 }));
849 });
850 let task = task.get_or_insert_default();
851 node.command_ctx.collect_commit_epoch_info(
852 &mut task.commit_info,
853 take(&mut node.state.resps),
854 self.collect_backfill_pinned_upstream_log_epoch(),
855 );
856 self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
857 task.finished_jobs.extend(finished_jobs);
858 task.notifiers.extend(node.notifiers);
859 task.epoch_infos
860 .try_insert(
861 self.database_id,
862 (Some((node.command_ctx, node.enqueue_time)), vec![]),
863 )
864 .expect("non duplicate");
865 break;
866 }
867 }
868 if !creating_jobs_task.is_empty() {
869 let task = task.get_or_insert_default();
870 for (table_id, epoch, resps, is_first_time) in creating_jobs_task {
871 collect_creating_job_commit_epoch_info(
872 &mut task.commit_info,
873 epoch,
874 resps,
875 self.creating_streaming_job_controls[&table_id].state_table_ids(),
876 is_first_time,
877 );
878 let (_, creating_job_epochs) =
879 task.epoch_infos.entry(self.database_id).or_default();
880 creating_job_epochs.push((table_id, epoch));
881 }
882 }
883 }
884
885 fn ack_completed(
886 &mut self,
887 command_prev_epoch: Option<u64>,
888 creating_job_epochs: Vec<(TableId, u64)>,
889 ) {
890 {
891 if let Some(prev_epoch) = self.completing_barrier.take() {
892 assert_eq!(command_prev_epoch, Some(prev_epoch));
893 self.committed_epoch = Some(prev_epoch);
894 } else {
895 assert_eq!(command_prev_epoch, None);
896 };
897 for (table_id, epoch) in creating_job_epochs {
898 self.creating_streaming_job_controls
899 .get_mut(&table_id)
900 .expect("should exist")
901 .ack_completed(epoch)
902 }
903 }
904 }
905}
906
907struct EpochNode {
909 enqueue_time: HistogramTimer,
911
912 state: BarrierEpochState,
914
915 command_ctx: CommandContext,
917 notifiers: Vec<Notifier>,
919}
920
921#[derive(Debug)]
922struct BarrierEpochState {
924 node_to_collect: NodeToCollect,
925
926 resps: Vec<BarrierCompleteResponse>,
927
928 creating_jobs_to_wait: HashSet<TableId>,
929
930 finished_jobs: HashMap<TableId, Vec<BarrierCompleteResponse>>,
931}
932
933impl BarrierEpochState {
934 fn is_inflight(&self) -> bool {
935 !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
936 }
937}
938
939impl DatabaseCheckpointControl {
940 fn handle_new_barrier(
942 &mut self,
943 command: Option<(Command, Vec<Notifier>)>,
944 checkpoint: bool,
945 span: tracing::Span,
946 control_stream_manager: &mut ControlStreamManager,
947 hummock_version_stats: &HummockVersionStats,
948 ) -> MetaResult<()> {
949 let curr_epoch = self.state.in_flight_prev_epoch().next();
950
951 let (mut command, mut notifiers) = if let Some((command, notifiers)) = command {
952 (Some(command), notifiers)
953 } else {
954 (None, vec![])
955 };
956
957 for table_to_cancel in command
958 .as_ref()
959 .map(Command::tables_to_drop)
960 .into_iter()
961 .flatten()
962 {
963 if self
964 .creating_streaming_job_controls
965 .contains_key(&table_to_cancel)
966 {
967 warn!(
968 table_id = table_to_cancel.table_id,
969 "ignore cancel command on creating streaming job"
970 );
971 for notifier in notifiers {
972 notifier
973 .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
974 }
975 return Ok(());
976 }
977 }
978
979 if let Some(Command::RescheduleFragment { .. }) = &command
980 && !self.creating_streaming_job_controls.is_empty()
981 {
982 warn!("ignore reschedule when creating streaming job with snapshot backfill");
983 for notifier in notifiers {
984 notifier.notify_start_failed(
985 anyhow!(
986 "cannot reschedule when creating streaming job with snapshot backfill",
987 )
988 .into(),
989 );
990 }
991 return Ok(());
992 }
993
994 let Some(barrier_info) =
995 self.state
996 .next_barrier_info(command.as_ref(), checkpoint, curr_epoch)
997 else {
998 for mut notifier in notifiers {
1000 notifier.notify_started();
1001 notifier.notify_collected();
1002 }
1003 return Ok(());
1004 };
1005
1006 let mut edges = self
1007 .state
1008 .inflight_graph_info
1009 .build_edge(command.as_ref(), &*control_stream_manager);
1010
1011 if let Some(Command::CreateStreamingJob {
1013 job_type,
1014 info,
1015 cross_db_snapshot_backfill_info,
1016 }) = &mut command
1017 {
1018 match job_type {
1019 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
1020 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1021 fill_snapshot_backfill_epoch(
1022 &mut fragment.nodes,
1023 None,
1024 cross_db_snapshot_backfill_info,
1025 )?;
1026 }
1027 }
1028 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
1029 if self.state.is_paused() {
1030 warn!("cannot create streaming job with snapshot backfill when paused");
1031 for notifier in notifiers {
1032 notifier.notify_start_failed(
1033 anyhow!("cannot create streaming job with snapshot backfill when paused",)
1034 .into(),
1035 );
1036 }
1037 return Ok(());
1038 }
1039 for snapshot_backfill_epoch in snapshot_backfill_info
1041 .upstream_mv_table_id_to_backfill_epoch
1042 .values_mut()
1043 {
1044 assert_eq!(
1045 snapshot_backfill_epoch.replace(barrier_info.prev_epoch()),
1046 None,
1047 "must not set previously"
1048 );
1049 }
1050 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1051 if let Err(e) = fill_snapshot_backfill_epoch(
1052 &mut fragment.nodes,
1053 Some(snapshot_backfill_info),
1054 cross_db_snapshot_backfill_info,
1055 ) {
1056 warn!(e = %e.as_report(), "failed to fill snapshot backfill epoch");
1057 for notifier in notifiers {
1058 notifier.notify_start_failed(e.clone());
1059 }
1060 return Ok(());
1061 };
1062 }
1063 let job_id = info.stream_job_fragments.stream_job_id();
1064 let snapshot_backfill_upstream_tables = snapshot_backfill_info
1065 .upstream_mv_table_id_to_backfill_epoch
1066 .keys()
1067 .cloned()
1068 .collect();
1069
1070 self.creating_streaming_job_controls.insert(
1071 job_id,
1072 CreatingStreamingJobControl::new(
1073 info,
1074 snapshot_backfill_upstream_tables,
1075 barrier_info.prev_epoch(),
1076 hummock_version_stats,
1077 control_stream_manager,
1078 edges.as_mut().expect("should exist"),
1079 )?,
1080 );
1081 }
1082 }
1083 }
1084
1085 if let (BarrierKind::Checkpoint(_), None) = (&barrier_info.kind, &command) {
1087 if let Some(jobs_to_merge) = self.jobs_to_merge() {
1088 command = Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge));
1089 } else {
1090 let pending_backfill_nodes =
1091 self.create_mview_tracker.take_pending_backfill_nodes();
1092 if !pending_backfill_nodes.is_empty() {
1093 command = Some(Command::StartFragmentBackfill {
1094 fragment_ids: pending_backfill_nodes,
1095 });
1096 }
1097 }
1098 }
1099
1100 let command = command;
1101
1102 let (
1103 pre_applied_graph_info,
1104 pre_applied_subscription_info,
1105 table_ids_to_commit,
1106 jobs_to_wait,
1107 prev_paused_reason,
1108 ) = self.state.apply_command(command.as_ref());
1109
1110 barrier_info.prev_epoch.span().in_scope(|| {
1112 tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch.value().0, "new barrier enqueued");
1113 });
1114 span.record("epoch", barrier_info.curr_epoch.value().0);
1115
1116 for creating_job in &mut self.creating_streaming_job_controls.values_mut() {
1117 creating_job.on_new_command(control_stream_manager, command.as_ref(), &barrier_info)?;
1118 }
1119
1120 let node_to_collect = match control_stream_manager.inject_command_ctx_barrier(
1121 self.database_id,
1122 command.as_ref(),
1123 &barrier_info,
1124 prev_paused_reason,
1125 &pre_applied_graph_info,
1126 &self.state.inflight_graph_info,
1127 &mut edges,
1128 ) {
1129 Ok(node_to_collect) => node_to_collect,
1130 Err(err) => {
1131 for notifier in notifiers {
1132 notifier.notify_start_failed(err.clone());
1133 }
1134 fail_point!("inject_barrier_err_success");
1135 return Err(err);
1136 }
1137 };
1138
1139 notifiers.iter_mut().for_each(|n| n.notify_started());
1141
1142 let command_ctx = CommandContext::new(
1143 barrier_info,
1144 pre_applied_subscription_info,
1145 table_ids_to_commit.clone(),
1146 command,
1147 span,
1148 );
1149
1150 self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1152
1153 Ok(())
1154 }
1155}