1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::hummock::HummockVersionStats;
29use risingwave_pb::stream_service::BarrierCompleteResponse;
30use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
31use tracing::{debug, warn};
32
33use crate::barrier::cdc_progress::CdcProgress;
34use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
35use crate::barrier::checkpoint::recovery::{
36 DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
37 RecoveringStateAction,
38};
39use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
40use crate::barrier::command::CommandContext;
41use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
42use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
43use crate::barrier::notifier::Notifier;
44use crate::barrier::progress::TrackingJob;
45use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
46use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
47use crate::barrier::utils::{
48 NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
49};
50use crate::barrier::{BackfillProgress, Command, CreateStreamingJobType};
51use crate::manager::MetaSrvEnv;
52use crate::rpc::metrics::GLOBAL_META_METRICS;
53use crate::{MetaError, MetaResult};
54
55pub(crate) struct CheckpointControl {
56 pub(crate) env: MetaSrvEnv,
57 pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
58 pub(super) hummock_version_stats: HummockVersionStats,
59 pub(crate) in_flight_barrier_nums: usize,
61}
62
63impl CheckpointControl {
64 pub fn new(env: MetaSrvEnv) -> Self {
65 Self {
66 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
67 env,
68 databases: Default::default(),
69 hummock_version_stats: Default::default(),
70 }
71 }
72
73 pub(crate) fn recover(
74 databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
75 failed_databases: HashMap<DatabaseId, HashSet<JobId>>, control_stream_manager: &mut ControlStreamManager,
77 hummock_version_stats: HummockVersionStats,
78 env: MetaSrvEnv,
79 ) -> Self {
80 env.shared_actor_infos()
81 .retain_databases(databases.keys().chain(failed_databases.keys()).cloned());
82 Self {
83 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
84 env,
85 databases: databases
86 .into_iter()
87 .map(|(database_id, control)| {
88 (
89 database_id,
90 DatabaseCheckpointControlStatus::Running(control),
91 )
92 })
93 .chain(
94 failed_databases
95 .into_iter()
96 .map(|(database_id, creating_jobs)| {
97 (
98 database_id,
99 DatabaseCheckpointControlStatus::Recovering(
100 DatabaseRecoveringState::new_resetting(
101 database_id,
102 creating_jobs.into_iter(),
103 control_stream_manager,
104 ),
105 ),
106 )
107 }),
108 )
109 .collect(),
110 hummock_version_stats,
111 }
112 }
113
114 pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
115 self.hummock_version_stats = output.hummock_version_stats;
116 for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
117 self.databases
118 .get_mut(&database_id)
119 .expect("should exist")
120 .expect_running("should have wait for completing command before enter recovery")
121 .ack_completed(command_prev_epoch, creating_job_epochs);
122 }
123 }
124
125 pub(crate) fn next_complete_barrier_task(
126 &mut self,
127 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
128 ) -> Option<CompleteBarrierTask> {
129 let mut task = None;
130 for database in self.databases.values_mut() {
131 let Some(database) = database.running_state_mut() else {
132 continue;
133 };
134 let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
135 database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
136 }
137 task
138 }
139
140 pub(crate) fn barrier_collected(
141 &mut self,
142 resp: BarrierCompleteResponse,
143 periodic_barriers: &mut PeriodicBarriers,
144 ) -> MetaResult<()> {
145 let (database_id, _) = from_partial_graph_id(resp.partial_graph_id);
146 let database_status = self.databases.get_mut(&database_id).expect("should exist");
147 match database_status {
148 DatabaseCheckpointControlStatus::Running(database) => {
149 database.barrier_collected(resp, periodic_barriers)
150 }
151 DatabaseCheckpointControlStatus::Recovering(state) => {
152 state.barrier_collected(database_id, resp);
153 Ok(())
154 }
155 }
156 }
157
158 pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
159 self.databases.iter().filter_map(|(database_id, database)| {
160 database.running_state().is_none().then_some(*database_id)
161 })
162 }
163
164 pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
165 self.databases.iter().filter_map(|(database_id, database)| {
166 database.running_state().is_some().then_some(*database_id)
167 })
168 }
169
170 pub(crate) fn handle_new_barrier(
172 &mut self,
173 new_barrier: NewBarrier,
174 control_stream_manager: &mut ControlStreamManager,
175 ) -> MetaResult<()> {
176 let NewBarrier {
177 database_id,
178 command,
179 span,
180 checkpoint,
181 } = new_barrier;
182
183 if let Some((mut command, notifiers)) = command {
184 if let &mut Command::CreateStreamingJob {
185 ref mut cross_db_snapshot_backfill_info,
186 ref info,
187 ..
188 } = &mut command
189 {
190 for (table_id, snapshot_epoch) in
191 &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
192 {
193 for database in self.databases.values() {
194 if let Some(database) = database.running_state()
195 && database.database_info.contains_job(table_id.as_job_id())
196 {
197 if let Some(committed_epoch) = database.committed_epoch {
198 *snapshot_epoch = Some(committed_epoch);
199 }
200 break;
201 }
202 }
203 if snapshot_epoch.is_none() {
204 let table_id = *table_id;
205 warn!(
206 ?cross_db_snapshot_backfill_info,
207 ?table_id,
208 ?info,
209 "database of cross db upstream table not found"
210 );
211 let err: MetaError =
212 anyhow!("database of cross db upstream table {} not found", table_id)
213 .into();
214 for notifier in notifiers {
215 notifier.notify_start_failed(err.clone());
216 }
217
218 return Ok(());
219 }
220 }
221 }
222
223 let database = match self.databases.entry(database_id) {
224 Entry::Occupied(entry) => entry
225 .into_mut()
226 .expect_running("should not have command when not running"),
227 Entry::Vacant(entry) => match &command {
228 Command::CreateStreamingJob { info, job_type, .. } => {
229 let CreateStreamingJobType::Normal = job_type else {
230 if cfg!(debug_assertions) {
231 panic!(
232 "unexpected first job of type {job_type:?} with info {info:?}"
233 );
234 } else {
235 for notifier in notifiers {
236 notifier.notify_start_failed(anyhow!("unexpected job_type {job_type:?} for first job {} in database {database_id}", info.streaming_job.id()).into());
237 }
238 return Ok(());
239 }
240 };
241 let new_database = DatabaseCheckpointControl::new(
242 database_id,
243 self.env.shared_actor_infos().clone(),
244 );
245 control_stream_manager.add_partial_graph(database_id, None);
246 entry
247 .insert(DatabaseCheckpointControlStatus::Running(new_database))
248 .expect_running("just initialized as running")
249 }
250 Command::Flush
251 | Command::Pause
252 | Command::Resume
253 | Command::DropStreamingJobs { .. }
254 | Command::DropSubscription { .. } => {
255 for mut notifier in notifiers {
256 notifier.notify_started();
257 notifier.notify_collected();
258 }
259 warn!(?command, "skip command for empty database");
260 return Ok(());
261 }
262 Command::RescheduleFragment { .. }
263 | Command::ReplaceStreamJob(_)
264 | Command::SourceChangeSplit(_)
265 | Command::Throttle { .. }
266 | Command::CreateSubscription { .. }
267 | Command::ConnectorPropsChange(_)
268 | Command::Refresh { .. }
269 | Command::ListFinish { .. }
270 | Command::LoadFinish { .. }
271 | Command::ResetSource { .. } => {
272 if cfg!(debug_assertions) {
273 panic!(
274 "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
275 database_id, command
276 )
277 } else {
278 warn!(%database_id, ?command, "database not exist when handling command");
279 for notifier in notifiers {
280 notifier.notify_start_failed(anyhow!("database {database_id} not exist when handling command {command:?}").into());
281 }
282 return Ok(());
283 }
284 }
285 },
286 };
287
288 database.handle_new_barrier(
289 Some((command, notifiers)),
290 checkpoint,
291 span,
292 control_stream_manager,
293 &self.hummock_version_stats,
294 )
295 } else {
296 let database = match self.databases.entry(database_id) {
297 Entry::Occupied(entry) => entry.into_mut(),
298 Entry::Vacant(_) => {
299 return Ok(());
302 }
303 };
304 let Some(database) = database.running_state_mut() else {
305 return Ok(());
307 };
308 if !database.can_inject_barrier(self.in_flight_barrier_nums) {
309 return Ok(());
311 }
312 database.handle_new_barrier(
313 None,
314 checkpoint,
315 span,
316 control_stream_manager,
317 &self.hummock_version_stats,
318 )
319 }
320 }
321
322 pub(crate) fn update_barrier_nums_metrics(&self) {
323 self.databases
324 .values()
325 .flat_map(|database| database.running_state())
326 .for_each(|database| database.update_barrier_nums_metrics());
327 }
328
329 pub(crate) fn gen_backfill_progress(&self) -> HashMap<JobId, BackfillProgress> {
330 let mut progress = HashMap::new();
331 for status in self.databases.values() {
332 let Some(database_checkpoint_control) = status.running_state() else {
333 continue;
334 };
335 progress.extend(
337 database_checkpoint_control
338 .database_info
339 .gen_backfill_progress(),
340 );
341 for creating_job in database_checkpoint_control
343 .creating_streaming_job_controls
344 .values()
345 {
346 progress.extend([(creating_job.job_id, creating_job.gen_backfill_progress())]);
347 }
348 }
349 progress
350 }
351
352 pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
353 let mut progress = HashMap::new();
354 for status in self.databases.values() {
355 let Some(database_checkpoint_control) = status.running_state() else {
356 continue;
357 };
358 progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
360 }
361 progress
362 }
363
364 pub(crate) fn databases_failed_at_worker_err(
365 &mut self,
366 worker_id: WorkerId,
367 ) -> Vec<DatabaseId> {
368 let mut failed_databases = Vec::new();
369 for (database_id, database_status) in &mut self.databases {
370 let database_checkpoint_control = match database_status {
371 DatabaseCheckpointControlStatus::Running(control) => control,
372 DatabaseCheckpointControlStatus::Recovering(state) => {
373 if !state.is_valid_after_worker_err(worker_id) {
374 failed_databases.push(*database_id);
375 }
376 continue;
377 }
378 };
379
380 if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
381 || database_checkpoint_control
382 .database_info
383 .contains_worker(worker_id as _)
384 || database_checkpoint_control
385 .creating_streaming_job_controls
386 .values_mut()
387 .any(|job| !job.is_valid_after_worker_err(worker_id))
388 {
389 failed_databases.push(*database_id);
390 }
391 }
392 failed_databases
393 }
394
395 pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
396 for (_, node) in self.databases.values_mut().flat_map(|status| {
397 status
398 .running_state_mut()
399 .map(|database| take(&mut database.command_ctx_queue))
400 .into_iter()
401 .flatten()
402 }) {
403 for notifier in node.notifiers {
404 notifier.notify_collection_failed(err.clone());
405 }
406 node.enqueue_time.observe_duration();
407 }
408 }
409
410 pub(crate) fn inflight_infos(
411 &self,
412 ) -> impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId> + '_)> + '_ {
413 self.databases.iter().flat_map(|(database_id, database)| {
414 database.database_state().map(|(_, creating_jobs)| {
415 (
416 *database_id,
417 creating_jobs
418 .values()
419 .filter_map(|job| job.is_consuming().then_some(job.job_id)),
420 )
421 })
422 })
423 }
424}
425
426pub(crate) enum CheckpointControlEvent<'a> {
427 EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
428 EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
429}
430
431impl CheckpointControl {
432 pub(crate) fn on_reset_partial_graph_resp(
433 &mut self,
434 worker_id: WorkerId,
435 resp: ResetPartialGraphResponse,
436 ) {
437 let (database_id, creating_job) = from_partial_graph_id(resp.partial_graph_id);
438 match self.databases.get_mut(&database_id).expect("should exist") {
439 DatabaseCheckpointControlStatus::Running(database) => {
440 if let Some(creating_job_id) = creating_job {
441 let Entry::Occupied(mut entry) = database
442 .creating_streaming_job_controls
443 .entry(creating_job_id)
444 else {
445 if cfg!(debug_assertions) {
446 panic!(
447 "receive reset partial graph resp on non-existing creating job: {resp:?}"
448 )
449 }
450 warn!(
451 %database_id,
452 %creating_job_id,
453 %worker_id,
454 ?resp,
455 "ignore reset partial graph resp on non-existing creating job on running database"
456 );
457 return;
458 };
459 if entry.get_mut().on_reset_partial_graph_resp(worker_id, resp) {
460 entry.remove();
461 }
462 } else {
463 unreachable!("should not receive reset database resp when database running")
464 }
465 }
466 DatabaseCheckpointControlStatus::Recovering(state) => {
467 state.on_reset_partial_graph_resp(worker_id, resp)
468 }
469 }
470 }
471
472 pub(crate) fn next_event(
473 &mut self,
474 ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
475 let mut this = Some(self);
476 poll_fn(move |cx| {
477 let Some(this_mut) = this.as_mut() else {
478 unreachable!("should not be polled after poll ready")
479 };
480 for (&database_id, database_status) in &mut this_mut.databases {
481 match database_status {
482 DatabaseCheckpointControlStatus::Running(_) => {}
483 DatabaseCheckpointControlStatus::Recovering(state) => {
484 let poll_result = state.poll_next_event(cx);
485 if let Poll::Ready(action) = poll_result {
486 let this = this.take().expect("checked Some");
487 return Poll::Ready(match action {
488 RecoveringStateAction::EnterInitializing(reset_workers) => {
489 CheckpointControlEvent::EnteringInitializing(
490 this.new_database_status_action(
491 database_id,
492 EnterInitializing(reset_workers),
493 ),
494 )
495 }
496 RecoveringStateAction::EnterRunning => {
497 CheckpointControlEvent::EnteringRunning(
498 this.new_database_status_action(database_id, EnterRunning),
499 )
500 }
501 });
502 }
503 }
504 }
505 }
506 Poll::Pending
507 })
508 }
509}
510
511pub(crate) enum DatabaseCheckpointControlStatus {
512 Running(DatabaseCheckpointControl),
513 Recovering(DatabaseRecoveringState),
514}
515
516impl DatabaseCheckpointControlStatus {
517 fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
518 match self {
519 DatabaseCheckpointControlStatus::Running(state) => Some(state),
520 DatabaseCheckpointControlStatus::Recovering(_) => None,
521 }
522 }
523
524 fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
525 match self {
526 DatabaseCheckpointControlStatus::Running(state) => Some(state),
527 DatabaseCheckpointControlStatus::Recovering(_) => None,
528 }
529 }
530
531 fn database_state(
532 &self,
533 ) -> Option<(
534 &BarrierWorkerState,
535 &HashMap<JobId, CreatingStreamingJobControl>,
536 )> {
537 match self {
538 DatabaseCheckpointControlStatus::Running(control) => {
539 Some((&control.state, &control.creating_streaming_job_controls))
540 }
541 DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
542 }
543 }
544
545 fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
546 match self {
547 DatabaseCheckpointControlStatus::Running(state) => state,
548 DatabaseCheckpointControlStatus::Recovering(_) => {
549 panic!("should be at running: {}", reason)
550 }
551 }
552 }
553}
554
555struct DatabaseCheckpointControlMetrics {
556 barrier_latency: LabelGuardedHistogram,
557 in_flight_barrier_nums: LabelGuardedIntGauge,
558 all_barrier_nums: LabelGuardedIntGauge,
559}
560
561impl DatabaseCheckpointControlMetrics {
562 fn new(database_id: DatabaseId) -> Self {
563 let database_id_str = database_id.to_string();
564 let barrier_latency = GLOBAL_META_METRICS
565 .barrier_latency
566 .with_guarded_label_values(&[&database_id_str]);
567 let in_flight_barrier_nums = GLOBAL_META_METRICS
568 .in_flight_barrier_nums
569 .with_guarded_label_values(&[&database_id_str]);
570 let all_barrier_nums = GLOBAL_META_METRICS
571 .all_barrier_nums
572 .with_guarded_label_values(&[&database_id_str]);
573 Self {
574 barrier_latency,
575 in_flight_barrier_nums,
576 all_barrier_nums,
577 }
578 }
579}
580
581pub(in crate::barrier) struct DatabaseCheckpointControl {
583 pub(super) database_id: DatabaseId,
584 pub(super) state: BarrierWorkerState,
585
586 command_ctx_queue: BTreeMap<u64, EpochNode>,
589 completing_barrier: Option<u64>,
592
593 committed_epoch: Option<u64>,
594
595 pub(super) database_info: InflightDatabaseInfo,
596 pub creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
597
598 metrics: DatabaseCheckpointControlMetrics,
599}
600
601impl DatabaseCheckpointControl {
602 fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
603 Self {
604 database_id,
605 state: BarrierWorkerState::new(),
606 command_ctx_queue: Default::default(),
607 completing_barrier: None,
608 committed_epoch: None,
609 database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
610 creating_streaming_job_controls: Default::default(),
611 metrics: DatabaseCheckpointControlMetrics::new(database_id),
612 }
613 }
614
615 pub(crate) fn recovery(
616 database_id: DatabaseId,
617 state: BarrierWorkerState,
618 committed_epoch: u64,
619 database_info: InflightDatabaseInfo,
620 creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
621 ) -> Self {
622 Self {
623 database_id,
624 state,
625 command_ctx_queue: Default::default(),
626 completing_barrier: None,
627 committed_epoch: Some(committed_epoch),
628 database_info,
629 creating_streaming_job_controls,
630 metrics: DatabaseCheckpointControlMetrics::new(database_id),
631 }
632 }
633
634 fn total_command_num(&self) -> usize {
635 self.command_ctx_queue.len()
636 + match &self.completing_barrier {
637 Some(_) => 1,
638 None => 0,
639 }
640 }
641
642 fn update_barrier_nums_metrics(&self) {
644 self.metrics.in_flight_barrier_nums.set(
645 self.command_ctx_queue
646 .values()
647 .filter(|x| x.state.is_inflight())
648 .count() as i64,
649 );
650 self.metrics
651 .all_barrier_nums
652 .set(self.total_command_num() as i64);
653 }
654
655 fn enqueue_command(
657 &mut self,
658 command_ctx: CommandContext,
659 notifiers: Vec<Notifier>,
660 node_to_collect: NodeToCollect,
661 creating_jobs_to_wait: HashSet<JobId>,
662 ) {
663 let timer = self.metrics.barrier_latency.start_timer();
664
665 if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
666 assert_eq!(
667 command_ctx.barrier_info.prev_epoch.value(),
668 node.command_ctx.barrier_info.curr_epoch.value()
669 );
670 }
671
672 tracing::trace!(
673 prev_epoch = command_ctx.barrier_info.prev_epoch(),
674 ?creating_jobs_to_wait,
675 ?node_to_collect,
676 "enqueue command"
677 );
678 self.command_ctx_queue.insert(
679 command_ctx.barrier_info.prev_epoch(),
680 EpochNode {
681 enqueue_time: timer,
682 state: BarrierEpochState {
683 node_to_collect,
684 resps: vec![],
685 creating_jobs_to_wait,
686 finished_jobs: HashMap::new(),
687 },
688 command_ctx,
689 notifiers,
690 },
691 );
692 }
693
694 fn barrier_collected(
697 &mut self,
698 resp: BarrierCompleteResponse,
699 periodic_barriers: &mut PeriodicBarriers,
700 ) -> MetaResult<()> {
701 let worker_id = resp.worker_id;
702 let prev_epoch = resp.epoch;
703 tracing::trace!(
704 %worker_id,
705 prev_epoch,
706 partial_graph_id = %resp.partial_graph_id,
707 "barrier collected"
708 );
709 let (database_id, creating_job_id) = from_partial_graph_id(resp.partial_graph_id);
710 assert_eq!(self.database_id, database_id);
711 match creating_job_id {
712 None => {
713 if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
714 assert!(node.state.node_to_collect.remove(&worker_id));
715 node.state.resps.push(resp);
716 } else {
717 panic!(
718 "collect barrier on non-existing barrier: {}, {}",
719 prev_epoch, worker_id
720 );
721 }
722 }
723 Some(creating_job_id) => {
724 let should_merge_to_upstream = self
725 .creating_streaming_job_controls
726 .get_mut(&creating_job_id)
727 .expect("should exist")
728 .collect(resp);
729 if should_merge_to_upstream {
730 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
731 }
732 }
733 }
734 Ok(())
735 }
736
737 fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
739 self.command_ctx_queue
740 .values()
741 .filter(|x| x.state.is_inflight())
742 .count()
743 < in_flight_barrier_nums
744 }
745
746 pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
748 for epoch_node in self.command_ctx_queue.values() {
749 if !is_valid_after_worker_err(&epoch_node.state.node_to_collect, worker_id) {
750 return false;
751 }
752 }
753 true
754 }
755}
756
757impl DatabaseCheckpointControl {
758 fn collect_backfill_pinned_upstream_log_epoch(
760 &self,
761 ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
762 self.creating_streaming_job_controls
763 .iter()
764 .map(|(job_id, creating_job)| {
765 let progress_epoch = creating_job.pinned_upstream_log_epoch();
766 (
767 *job_id,
768 (
769 progress_epoch,
770 creating_job.snapshot_backfill_upstream_tables.clone(),
771 ),
772 )
773 })
774 .collect()
775 }
776
777 fn next_complete_barrier_task(
778 &mut self,
779 task: &mut Option<CompleteBarrierTask>,
780 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
781 hummock_version_stats: &HummockVersionStats,
782 ) {
783 let mut creating_jobs_task = vec![];
785 if let Some(committed_epoch) = self.committed_epoch {
786 let mut finished_jobs = Vec::new();
788 let min_upstream_inflight_barrier = self
789 .command_ctx_queue
790 .first_key_value()
791 .map(|(epoch, _)| *epoch);
792 for (job_id, job) in &mut self.creating_streaming_job_controls {
793 if let Some((epoch, resps, status)) =
794 job.start_completing(min_upstream_inflight_barrier, committed_epoch)
795 {
796 let create_info = match status {
797 CompleteJobType::First(info) => Some(info),
798 CompleteJobType::Normal => None,
799 CompleteJobType::Finished => {
800 finished_jobs.push((*job_id, epoch, resps));
801 continue;
802 }
803 };
804 creating_jobs_task.push((*job_id, epoch, resps, create_info));
805 }
806 }
807 if !finished_jobs.is_empty()
808 && let Some((_, control_stream_manager)) = &mut context
809 {
810 control_stream_manager.remove_partial_graph(
811 self.database_id,
812 finished_jobs.iter().map(|(job_id, _, _)| *job_id).collect(),
813 );
814 }
815 for (job_id, epoch, resps) in finished_jobs {
816 let epoch_state = &mut self
817 .command_ctx_queue
818 .get_mut(&epoch)
819 .expect("should exist")
820 .state;
821 assert!(epoch_state.creating_jobs_to_wait.remove(&job_id));
822 debug!(epoch, %job_id, "finish creating job");
823 let creating_streaming_job = self
826 .creating_streaming_job_controls
827 .remove(&job_id)
828 .expect("should exist");
829 let tracking_job = creating_streaming_job.into_tracking_job();
830
831 assert!(
832 epoch_state
833 .finished_jobs
834 .insert(job_id, (resps, tracking_job))
835 .is_none()
836 );
837 }
838 }
839 assert!(self.completing_barrier.is_none());
840 while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
841 && !state.is_inflight()
842 {
843 {
844 let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
845 assert!(node.state.creating_jobs_to_wait.is_empty());
846 assert!(node.state.node_to_collect.is_empty());
847
848 self.handle_refresh_table_info(task, &node);
849
850 self.database_info.apply_collected_command(
851 &node.command_ctx.command,
852 &node.state.resps,
853 hummock_version_stats,
854 );
855 if !node.command_ctx.barrier_info.kind.is_checkpoint() {
856 node.notifiers.into_iter().for_each(|notifier| {
857 notifier.notify_collected();
858 });
859 if let Some((periodic_barriers, _)) = &mut context
860 && self.database_info.has_pending_finished_jobs()
861 && self
862 .command_ctx_queue
863 .values()
864 .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
865 {
866 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
867 }
868 continue;
869 }
870 let mut staging_commit_info = self.database_info.take_staging_commit_info();
871 node.state
872 .finished_jobs
873 .drain()
874 .for_each(|(_job_id, (resps, tracking_job))| {
875 node.state.resps.extend(resps);
876 staging_commit_info.finished_jobs.push(tracking_job);
877 });
878 let task = task.get_or_insert_default();
879 node.command_ctx.collect_commit_epoch_info(
880 &mut task.commit_info,
881 take(&mut node.state.resps),
882 self.collect_backfill_pinned_upstream_log_epoch(),
883 );
884 self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
885 task.finished_jobs.extend(staging_commit_info.finished_jobs);
886 task.finished_cdc_table_backfill
887 .extend(staging_commit_info.finished_cdc_table_backfill);
888 task.notifiers.extend(node.notifiers);
889 task.epoch_infos
890 .try_insert(
891 self.database_id,
892 (Some((node.command_ctx, node.enqueue_time)), vec![]),
893 )
894 .expect("non duplicate");
895 task.commit_info
896 .truncate_tables
897 .extend(staging_commit_info.table_ids_to_truncate);
898 break;
899 }
900 }
901 if !creating_jobs_task.is_empty() {
902 let task = task.get_or_insert_default();
903 for (job_id, epoch, resps, create_info) in creating_jobs_task {
904 collect_creating_job_commit_epoch_info(
905 &mut task.commit_info,
906 epoch,
907 resps,
908 self.creating_streaming_job_controls[&job_id]
909 .state_table_ids()
910 .iter()
911 .copied(),
912 create_info.as_ref(),
913 );
914 let (_, creating_job_epochs) =
915 task.epoch_infos.entry(self.database_id).or_default();
916 creating_job_epochs.push((job_id, epoch, create_info));
917 }
918 }
919 }
920
921 fn ack_completed(
922 &mut self,
923 command_prev_epoch: Option<u64>,
924 creating_job_epochs: Vec<(JobId, u64)>,
925 ) {
926 {
927 if let Some(prev_epoch) = self.completing_barrier.take() {
928 assert_eq!(command_prev_epoch, Some(prev_epoch));
929 self.committed_epoch = Some(prev_epoch);
930 } else {
931 assert_eq!(command_prev_epoch, None);
932 };
933 for (job_id, epoch) in creating_job_epochs {
934 self.creating_streaming_job_controls
935 .get_mut(&job_id)
936 .expect("should exist")
937 .ack_completed(epoch)
938 }
939 }
940 }
941
942 fn handle_refresh_table_info(&self, task: &mut Option<CompleteBarrierTask>, node: &EpochNode) {
943 let list_finished_info = node
944 .state
945 .resps
946 .iter()
947 .flat_map(|resp| resp.list_finished_sources.clone())
948 .collect::<Vec<_>>();
949 if !list_finished_info.is_empty() {
950 let task = task.get_or_insert_default();
951 task.list_finished_source_ids.extend(list_finished_info);
952 }
953
954 let load_finished_info = node
955 .state
956 .resps
957 .iter()
958 .flat_map(|resp| resp.load_finished_sources.clone())
959 .collect::<Vec<_>>();
960 if !load_finished_info.is_empty() {
961 let task = task.get_or_insert_default();
962 task.load_finished_source_ids.extend(load_finished_info);
963 }
964
965 let refresh_finished_table_ids: Vec<JobId> = node
966 .state
967 .resps
968 .iter()
969 .flat_map(|resp| {
970 resp.refresh_finished_tables
971 .iter()
972 .map(|table_id| table_id.as_job_id())
973 })
974 .collect::<Vec<_>>();
975 if !refresh_finished_table_ids.is_empty() {
976 let task = task.get_or_insert_default();
977 task.refresh_finished_table_job_ids
978 .extend(refresh_finished_table_ids);
979 }
980 }
981}
982
983struct EpochNode {
985 enqueue_time: HistogramTimer,
987
988 state: BarrierEpochState,
990
991 command_ctx: CommandContext,
993 notifiers: Vec<Notifier>,
995}
996
997#[derive(Debug)]
998struct BarrierEpochState {
1000 node_to_collect: NodeToCollect,
1001
1002 resps: Vec<BarrierCompleteResponse>,
1003
1004 creating_jobs_to_wait: HashSet<JobId>,
1005
1006 finished_jobs: HashMap<JobId, (Vec<BarrierCompleteResponse>, TrackingJob)>,
1007}
1008
1009impl BarrierEpochState {
1010 fn is_inflight(&self) -> bool {
1011 !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
1012 }
1013}
1014
1015impl DatabaseCheckpointControl {
1016 fn handle_new_barrier(
1018 &mut self,
1019 command: Option<(Command, Vec<Notifier>)>,
1020 checkpoint: bool,
1021 span: tracing::Span,
1022 control_stream_manager: &mut ControlStreamManager,
1023 hummock_version_stats: &HummockVersionStats,
1024 ) -> MetaResult<()> {
1025 let curr_epoch = self.state.in_flight_prev_epoch().next();
1026
1027 let (command, mut notifiers) = if let Some((command, notifiers)) = command {
1028 (Some(command), notifiers)
1029 } else {
1030 (None, vec![])
1031 };
1032
1033 if let Some(Command::DropStreamingJobs {
1034 streaming_job_ids, ..
1035 }) = &command
1036 {
1037 if streaming_job_ids.len() > 1 {
1038 for job_to_cancel in streaming_job_ids {
1039 if self
1040 .creating_streaming_job_controls
1041 .contains_key(job_to_cancel)
1042 {
1043 warn!(
1044 job_id = %job_to_cancel,
1045 "ignore multi-job cancel command on creating snapshot backfill streaming job"
1046 );
1047 for notifier in notifiers {
1048 notifier
1049 .notify_start_failed(anyhow!("cannot cancel creating snapshot backfill streaming job with other jobs, \
1050 the job will continue creating until created or recovery. Please cancel the snapshot backfilling job in a single DDL ").into());
1051 }
1052 return Ok(());
1053 }
1054 }
1055 } else if let Some(job_to_drop) = streaming_job_ids.iter().next()
1056 && let Some(creating_job) =
1057 self.creating_streaming_job_controls.get_mut(job_to_drop)
1058 && creating_job.drop(&mut notifiers, control_stream_manager)
1059 {
1060 return Ok(());
1061 }
1062 }
1063
1064 if let Some(Command::Throttle { jobs, .. }) = &command
1065 && jobs.len() > 1
1066 && let Some(creating_job_id) = jobs
1067 .iter()
1068 .find(|job| self.creating_streaming_job_controls.contains_key(job))
1069 {
1070 warn!(
1071 job_id = %creating_job_id,
1072 "ignore multi-job throttle command on creating snapshot backfill streaming job"
1073 );
1074 for notifier in notifiers {
1075 notifier
1076 .notify_start_failed(anyhow!("cannot alter rate limit for snapshot backfill streaming job with other jobs, \
1077 the original rate limit will be kept recovery.").into());
1078 }
1079 return Ok(());
1080 };
1081
1082 if let Some(Command::RescheduleFragment { .. }) = &command
1083 && !self.creating_streaming_job_controls.is_empty()
1084 {
1085 warn!("ignore reschedule when creating streaming job with snapshot backfill");
1086 for notifier in notifiers {
1087 notifier.notify_start_failed(
1088 anyhow!(
1089 "cannot reschedule when creating streaming job with snapshot backfill",
1090 )
1091 .into(),
1092 );
1093 }
1094 return Ok(());
1095 }
1096
1097 if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1098 && self.database_info.is_empty()
1099 {
1100 assert!(
1101 self.creating_streaming_job_controls.is_empty(),
1102 "should not have snapshot backfill job when there is no normal job in database"
1103 );
1104 for mut notifier in notifiers {
1106 notifier.notify_started();
1107 notifier.notify_collected();
1108 }
1109 return Ok(());
1110 };
1111
1112 if let Some(Command::CreateStreamingJob {
1113 job_type: CreateStreamingJobType::SnapshotBackfill(_),
1114 ..
1115 }) = &command
1116 && self.state.is_paused()
1117 {
1118 warn!("cannot create streaming job with snapshot backfill when paused");
1119 for notifier in notifiers {
1120 notifier.notify_start_failed(
1121 anyhow!("cannot create streaming job with snapshot backfill when paused",)
1122 .into(),
1123 );
1124 }
1125 return Ok(());
1126 }
1127
1128 let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1129 barrier_info.prev_epoch.span().in_scope(|| {
1131 tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch(), "new barrier enqueued");
1132 });
1133 span.record("epoch", barrier_info.curr_epoch());
1134
1135 let ApplyCommandInfo {
1136 mv_subscription_max_retention,
1137 table_ids_to_commit,
1138 jobs_to_wait,
1139 node_to_collect,
1140 command,
1141 } = match self.apply_command(
1142 command,
1143 &mut notifiers,
1144 &barrier_info,
1145 control_stream_manager,
1146 hummock_version_stats,
1147 ) {
1148 Ok(info) => info,
1149 Err(err) => {
1150 for notifier in notifiers {
1151 notifier.notify_start_failed(err.clone());
1152 }
1153 fail_point!("inject_barrier_err_success");
1154 return Err(err);
1155 }
1156 };
1157
1158 notifiers.iter_mut().for_each(|n| n.notify_started());
1160
1161 let command_ctx = CommandContext::new(
1162 barrier_info,
1163 mv_subscription_max_retention,
1164 table_ids_to_commit,
1165 command,
1166 span,
1167 );
1168
1169 self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1171
1172 Ok(())
1173 }
1174}