1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_meta_model::WorkerId;
28use risingwave_pb::hummock::HummockVersionStats;
29use risingwave_pb::stream_service::BarrierCompleteResponse;
30use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
31use tracing::{debug, warn};
32
33use crate::barrier::cdc_progress::CdcProgress;
34use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
35use crate::barrier::checkpoint::recovery::{
36 DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
37 RecoveringStateAction,
38};
39use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
40use crate::barrier::command::CommandContext;
41use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
42use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
43use crate::barrier::notifier::Notifier;
44use crate::barrier::progress::TrackingJob;
45use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
46use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
47use crate::barrier::utils::{
48 NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
49};
50use crate::barrier::{BackfillProgress, Command, CreateStreamingJobType, FragmentBackfillProgress};
51use crate::manager::MetaSrvEnv;
52use crate::rpc::metrics::GLOBAL_META_METRICS;
53use crate::{MetaError, MetaResult};
54
55pub(crate) struct CheckpointControl {
56 pub(crate) env: MetaSrvEnv,
57 pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
58 pub(super) hummock_version_stats: HummockVersionStats,
59 pub(crate) in_flight_barrier_nums: usize,
61}
62
63impl CheckpointControl {
64 pub fn new(env: MetaSrvEnv) -> Self {
65 Self {
66 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
67 env,
68 databases: Default::default(),
69 hummock_version_stats: Default::default(),
70 }
71 }
72
73 pub(crate) fn recover(
74 databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
75 failed_databases: HashMap<DatabaseId, HashSet<JobId>>, control_stream_manager: &mut ControlStreamManager,
77 hummock_version_stats: HummockVersionStats,
78 env: MetaSrvEnv,
79 ) -> Self {
80 env.shared_actor_infos()
81 .retain_databases(databases.keys().chain(failed_databases.keys()).cloned());
82 Self {
83 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
84 env,
85 databases: databases
86 .into_iter()
87 .map(|(database_id, control)| {
88 (
89 database_id,
90 DatabaseCheckpointControlStatus::Running(control),
91 )
92 })
93 .chain(
94 failed_databases
95 .into_iter()
96 .map(|(database_id, creating_jobs)| {
97 (
98 database_id,
99 DatabaseCheckpointControlStatus::Recovering(
100 DatabaseRecoveringState::new_resetting(
101 database_id,
102 creating_jobs.into_iter(),
103 control_stream_manager,
104 ),
105 ),
106 )
107 }),
108 )
109 .collect(),
110 hummock_version_stats,
111 }
112 }
113
114 pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
115 self.hummock_version_stats = output.hummock_version_stats;
116 for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
117 self.databases
118 .get_mut(&database_id)
119 .expect("should exist")
120 .expect_running("should have wait for completing command before enter recovery")
121 .ack_completed(command_prev_epoch, creating_job_epochs);
122 }
123 }
124
125 pub(crate) fn next_complete_barrier_task(
126 &mut self,
127 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
128 ) -> Option<CompleteBarrierTask> {
129 let mut task = None;
130 for database in self.databases.values_mut() {
131 let Some(database) = database.running_state_mut() else {
132 continue;
133 };
134 let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
135 database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
136 }
137 task
138 }
139
140 pub(crate) fn barrier_collected(
141 &mut self,
142 resp: BarrierCompleteResponse,
143 periodic_barriers: &mut PeriodicBarriers,
144 ) -> MetaResult<()> {
145 let (database_id, _) = from_partial_graph_id(resp.partial_graph_id);
146 let database_status = self.databases.get_mut(&database_id).expect("should exist");
147 match database_status {
148 DatabaseCheckpointControlStatus::Running(database) => {
149 database.barrier_collected(resp, periodic_barriers)
150 }
151 DatabaseCheckpointControlStatus::Recovering(state) => {
152 state.barrier_collected(database_id, resp);
153 Ok(())
154 }
155 }
156 }
157
158 pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
159 self.databases.iter().filter_map(|(database_id, database)| {
160 database.running_state().is_none().then_some(*database_id)
161 })
162 }
163
164 pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
165 self.databases.iter().filter_map(|(database_id, database)| {
166 database.running_state().is_some().then_some(*database_id)
167 })
168 }
169
170 pub(crate) fn database_info(&self, database_id: DatabaseId) -> Option<&InflightDatabaseInfo> {
171 self.databases
172 .get(&database_id)
173 .and_then(|database| database.running_state())
174 .map(|database| &database.database_info)
175 }
176
177 pub(crate) fn may_have_snapshot_backfilling_jobs(&self) -> bool {
178 self.databases
179 .values()
180 .any(|database| database.may_have_snapshot_backfilling_jobs())
181 }
182
183 pub(crate) fn handle_new_barrier(
185 &mut self,
186 new_barrier: NewBarrier,
187 control_stream_manager: &mut ControlStreamManager,
188 ) -> MetaResult<()> {
189 let NewBarrier {
190 database_id,
191 command,
192 span,
193 checkpoint,
194 } = new_barrier;
195
196 if let Some((mut command, notifiers)) = command {
197 if let &mut Command::CreateStreamingJob {
198 ref mut cross_db_snapshot_backfill_info,
199 ref info,
200 ..
201 } = &mut command
202 {
203 for (table_id, snapshot_epoch) in
204 &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
205 {
206 for database in self.databases.values() {
207 if let Some(database) = database.running_state()
208 && database.database_info.contains_job(table_id.as_job_id())
209 {
210 if let Some(committed_epoch) = database.committed_epoch {
211 *snapshot_epoch = Some(committed_epoch);
212 }
213 break;
214 }
215 }
216 if snapshot_epoch.is_none() {
217 let table_id = *table_id;
218 warn!(
219 ?cross_db_snapshot_backfill_info,
220 ?table_id,
221 ?info,
222 "database of cross db upstream table not found"
223 );
224 let err: MetaError =
225 anyhow!("database of cross db upstream table {} not found", table_id)
226 .into();
227 for notifier in notifiers {
228 notifier.notify_start_failed(err.clone());
229 }
230
231 return Ok(());
232 }
233 }
234 }
235
236 let database = match self.databases.entry(database_id) {
237 Entry::Occupied(entry) => entry
238 .into_mut()
239 .expect_running("should not have command when not running"),
240 Entry::Vacant(entry) => match &command {
241 Command::CreateStreamingJob { info, job_type, .. } => {
242 let CreateStreamingJobType::Normal = job_type else {
243 if cfg!(debug_assertions) {
244 panic!(
245 "unexpected first job of type {job_type:?} with info {info:?}"
246 );
247 } else {
248 for notifier in notifiers {
249 notifier.notify_start_failed(anyhow!("unexpected job_type {job_type:?} for first job {} in database {database_id}", info.streaming_job.id()).into());
250 }
251 return Ok(());
252 }
253 };
254 let new_database = DatabaseCheckpointControl::new(
255 database_id,
256 self.env.shared_actor_infos().clone(),
257 );
258 control_stream_manager.add_partial_graph(database_id, None);
259 entry
260 .insert(DatabaseCheckpointControlStatus::Running(new_database))
261 .expect_running("just initialized as running")
262 }
263 Command::Flush
264 | Command::Pause
265 | Command::Resume
266 | Command::DropStreamingJobs { .. }
267 | Command::DropSubscription { .. } => {
268 for mut notifier in notifiers {
269 notifier.notify_started();
270 notifier.notify_collected();
271 }
272 warn!(?command, "skip command for empty database");
273 return Ok(());
274 }
275 Command::RescheduleIntent { .. }
276 | Command::ReplaceStreamJob(_)
277 | Command::SourceChangeSplit(_)
278 | Command::Throttle { .. }
279 | Command::CreateSubscription { .. }
280 | Command::ConnectorPropsChange(_)
281 | Command::Refresh { .. }
282 | Command::ListFinish { .. }
283 | Command::LoadFinish { .. }
284 | Command::ResetSource { .. }
285 | Command::ResumeBackfill { .. }
286 | Command::InjectSourceOffsets { .. } => {
287 if cfg!(debug_assertions) {
288 panic!(
289 "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
290 database_id, command
291 )
292 } else {
293 warn!(%database_id, ?command, "database not exist when handling command");
294 for notifier in notifiers {
295 notifier.notify_start_failed(anyhow!("database {database_id} not exist when handling command {command:?}").into());
296 }
297 return Ok(());
298 }
299 }
300 },
301 };
302
303 database.handle_new_barrier(
304 Some((command, notifiers)),
305 checkpoint,
306 span,
307 control_stream_manager,
308 &self.hummock_version_stats,
309 )
310 } else {
311 let database = match self.databases.entry(database_id) {
312 Entry::Occupied(entry) => entry.into_mut(),
313 Entry::Vacant(_) => {
314 return Ok(());
317 }
318 };
319 let Some(database) = database.running_state_mut() else {
320 return Ok(());
322 };
323 if !database.can_inject_barrier(self.in_flight_barrier_nums) {
324 return Ok(());
326 }
327 database.handle_new_barrier(
328 None,
329 checkpoint,
330 span,
331 control_stream_manager,
332 &self.hummock_version_stats,
333 )
334 }
335 }
336
337 pub(crate) fn update_barrier_nums_metrics(&self) {
338 self.databases
339 .values()
340 .flat_map(|database| database.running_state())
341 .for_each(|database| database.update_barrier_nums_metrics());
342 }
343
344 pub(crate) fn gen_backfill_progress(&self) -> HashMap<JobId, BackfillProgress> {
345 let mut progress = HashMap::new();
346 for status in self.databases.values() {
347 let Some(database_checkpoint_control) = status.running_state() else {
348 continue;
349 };
350 progress.extend(
352 database_checkpoint_control
353 .database_info
354 .gen_backfill_progress(),
355 );
356 for creating_job in database_checkpoint_control
358 .creating_streaming_job_controls
359 .values()
360 {
361 progress.extend([(creating_job.job_id, creating_job.gen_backfill_progress())]);
362 }
363 }
364 progress
365 }
366
367 pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
368 let mut progress = Vec::new();
369 for status in self.databases.values() {
370 let Some(database_checkpoint_control) = status.running_state() else {
371 continue;
372 };
373 progress.extend(
374 database_checkpoint_control
375 .database_info
376 .gen_fragment_backfill_progress(),
377 );
378 for creating_job in database_checkpoint_control
379 .creating_streaming_job_controls
380 .values()
381 {
382 progress.extend(creating_job.gen_fragment_backfill_progress());
383 }
384 }
385 progress
386 }
387
388 pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
389 let mut progress = HashMap::new();
390 for status in self.databases.values() {
391 let Some(database_checkpoint_control) = status.running_state() else {
392 continue;
393 };
394 progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
396 }
397 progress
398 }
399
400 pub(crate) fn databases_failed_at_worker_err(
401 &mut self,
402 worker_id: WorkerId,
403 ) -> Vec<DatabaseId> {
404 let mut failed_databases = Vec::new();
405 for (database_id, database_status) in &mut self.databases {
406 let database_checkpoint_control = match database_status {
407 DatabaseCheckpointControlStatus::Running(control) => control,
408 DatabaseCheckpointControlStatus::Recovering(state) => {
409 if !state.is_valid_after_worker_err(worker_id) {
410 failed_databases.push(*database_id);
411 }
412 continue;
413 }
414 };
415
416 if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
417 || database_checkpoint_control
418 .database_info
419 .contains_worker(worker_id as _)
420 || database_checkpoint_control
421 .creating_streaming_job_controls
422 .values_mut()
423 .any(|job| !job.is_valid_after_worker_err(worker_id))
424 {
425 failed_databases.push(*database_id);
426 }
427 }
428 failed_databases
429 }
430
431 pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
432 for (_, node) in self.databases.values_mut().flat_map(|status| {
433 status
434 .running_state_mut()
435 .map(|database| take(&mut database.command_ctx_queue))
436 .into_iter()
437 .flatten()
438 }) {
439 for notifier in node.notifiers {
440 notifier.notify_collection_failed(err.clone());
441 }
442 node.enqueue_time.observe_duration();
443 }
444 }
445
446 pub(crate) fn inflight_infos(
447 &self,
448 ) -> impl Iterator<Item = (DatabaseId, impl Iterator<Item = JobId> + '_)> + '_ {
449 self.databases.iter().flat_map(|(database_id, database)| {
450 database.database_state().map(|(_, creating_jobs)| {
451 (
452 *database_id,
453 creating_jobs
454 .values()
455 .filter_map(|job| job.is_consuming().then_some(job.job_id)),
456 )
457 })
458 })
459 }
460}
461
462pub(crate) enum CheckpointControlEvent<'a> {
463 EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
464 EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
465}
466
467impl CheckpointControl {
468 pub(crate) fn on_reset_partial_graph_resp(
469 &mut self,
470 worker_id: WorkerId,
471 resp: ResetPartialGraphResponse,
472 ) {
473 let (database_id, creating_job) = from_partial_graph_id(resp.partial_graph_id);
474 match self.databases.get_mut(&database_id).expect("should exist") {
475 DatabaseCheckpointControlStatus::Running(database) => {
476 if let Some(creating_job_id) = creating_job {
477 let Entry::Occupied(mut entry) = database
478 .creating_streaming_job_controls
479 .entry(creating_job_id)
480 else {
481 if cfg!(debug_assertions) {
482 panic!(
483 "receive reset partial graph resp on non-existing creating job: {resp:?}"
484 )
485 }
486 warn!(
487 %database_id,
488 %creating_job_id,
489 %worker_id,
490 ?resp,
491 "ignore reset partial graph resp on non-existing creating job on running database"
492 );
493 return;
494 };
495 if entry.get_mut().on_reset_partial_graph_resp(worker_id, resp) {
496 entry.remove();
497 }
498 } else {
499 unreachable!("should not receive reset database resp when database running")
500 }
501 }
502 DatabaseCheckpointControlStatus::Recovering(state) => {
503 state.on_reset_partial_graph_resp(worker_id, resp)
504 }
505 }
506 }
507
508 pub(crate) fn next_event(
509 &mut self,
510 ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
511 let mut this = Some(self);
512 poll_fn(move |cx| {
513 let Some(this_mut) = this.as_mut() else {
514 unreachable!("should not be polled after poll ready")
515 };
516 for (&database_id, database_status) in &mut this_mut.databases {
517 match database_status {
518 DatabaseCheckpointControlStatus::Running(_) => {}
519 DatabaseCheckpointControlStatus::Recovering(state) => {
520 let poll_result = state.poll_next_event(cx);
521 if let Poll::Ready(action) = poll_result {
522 let this = this.take().expect("checked Some");
523 return Poll::Ready(match action {
524 RecoveringStateAction::EnterInitializing(reset_workers) => {
525 CheckpointControlEvent::EnteringInitializing(
526 this.new_database_status_action(
527 database_id,
528 EnterInitializing(reset_workers),
529 ),
530 )
531 }
532 RecoveringStateAction::EnterRunning => {
533 CheckpointControlEvent::EnteringRunning(
534 this.new_database_status_action(database_id, EnterRunning),
535 )
536 }
537 });
538 }
539 }
540 }
541 }
542 Poll::Pending
543 })
544 }
545}
546
547pub(crate) enum DatabaseCheckpointControlStatus {
548 Running(DatabaseCheckpointControl),
549 Recovering(DatabaseRecoveringState),
550}
551
552impl DatabaseCheckpointControlStatus {
553 fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
554 match self {
555 DatabaseCheckpointControlStatus::Running(state) => Some(state),
556 DatabaseCheckpointControlStatus::Recovering(_) => None,
557 }
558 }
559
560 fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
561 match self {
562 DatabaseCheckpointControlStatus::Running(state) => Some(state),
563 DatabaseCheckpointControlStatus::Recovering(_) => None,
564 }
565 }
566
567 fn may_have_snapshot_backfilling_jobs(&self) -> bool {
568 self.running_state()
569 .map(|database| !database.creating_streaming_job_controls.is_empty())
570 .unwrap_or(true) }
572
573 fn database_state(
574 &self,
575 ) -> Option<(
576 &BarrierWorkerState,
577 &HashMap<JobId, CreatingStreamingJobControl>,
578 )> {
579 match self {
580 DatabaseCheckpointControlStatus::Running(control) => {
581 Some((&control.state, &control.creating_streaming_job_controls))
582 }
583 DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
584 }
585 }
586
587 fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
588 match self {
589 DatabaseCheckpointControlStatus::Running(state) => state,
590 DatabaseCheckpointControlStatus::Recovering(_) => {
591 panic!("should be at running: {}", reason)
592 }
593 }
594 }
595}
596
597struct DatabaseCheckpointControlMetrics {
598 barrier_latency: LabelGuardedHistogram,
599 in_flight_barrier_nums: LabelGuardedIntGauge,
600 all_barrier_nums: LabelGuardedIntGauge,
601}
602
603impl DatabaseCheckpointControlMetrics {
604 fn new(database_id: DatabaseId) -> Self {
605 let database_id_str = database_id.to_string();
606 let barrier_latency = GLOBAL_META_METRICS
607 .barrier_latency
608 .with_guarded_label_values(&[&database_id_str]);
609 let in_flight_barrier_nums = GLOBAL_META_METRICS
610 .in_flight_barrier_nums
611 .with_guarded_label_values(&[&database_id_str]);
612 let all_barrier_nums = GLOBAL_META_METRICS
613 .all_barrier_nums
614 .with_guarded_label_values(&[&database_id_str]);
615 Self {
616 barrier_latency,
617 in_flight_barrier_nums,
618 all_barrier_nums,
619 }
620 }
621}
622
623pub(in crate::barrier) struct DatabaseCheckpointControl {
625 pub(super) database_id: DatabaseId,
626 pub(super) state: BarrierWorkerState,
627
628 command_ctx_queue: BTreeMap<u64, EpochNode>,
631 completing_barrier: Option<u64>,
634
635 committed_epoch: Option<u64>,
636
637 pub(super) database_info: InflightDatabaseInfo,
638 pub creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
639
640 metrics: DatabaseCheckpointControlMetrics,
641}
642
643impl DatabaseCheckpointControl {
644 fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
645 Self {
646 database_id,
647 state: BarrierWorkerState::new(),
648 command_ctx_queue: Default::default(),
649 completing_barrier: None,
650 committed_epoch: None,
651 database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
652 creating_streaming_job_controls: Default::default(),
653 metrics: DatabaseCheckpointControlMetrics::new(database_id),
654 }
655 }
656
657 pub(crate) fn recovery(
658 database_id: DatabaseId,
659 state: BarrierWorkerState,
660 committed_epoch: u64,
661 database_info: InflightDatabaseInfo,
662 creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
663 ) -> Self {
664 Self {
665 database_id,
666 state,
667 command_ctx_queue: Default::default(),
668 completing_barrier: None,
669 committed_epoch: Some(committed_epoch),
670 database_info,
671 creating_streaming_job_controls,
672 metrics: DatabaseCheckpointControlMetrics::new(database_id),
673 }
674 }
675
676 fn total_command_num(&self) -> usize {
677 self.command_ctx_queue.len()
678 + match &self.completing_barrier {
679 Some(_) => 1,
680 None => 0,
681 }
682 }
683
684 fn update_barrier_nums_metrics(&self) {
686 self.metrics.in_flight_barrier_nums.set(
687 self.command_ctx_queue
688 .values()
689 .filter(|x| x.state.is_inflight())
690 .count() as i64,
691 );
692 self.metrics
693 .all_barrier_nums
694 .set(self.total_command_num() as i64);
695 }
696
697 fn enqueue_command(
699 &mut self,
700 command_ctx: CommandContext,
701 notifiers: Vec<Notifier>,
702 node_to_collect: NodeToCollect,
703 creating_jobs_to_wait: HashSet<JobId>,
704 ) {
705 let timer = self.metrics.barrier_latency.start_timer();
706
707 if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
708 assert_eq!(
709 command_ctx.barrier_info.prev_epoch.value(),
710 node.command_ctx.barrier_info.curr_epoch.value()
711 );
712 }
713
714 tracing::trace!(
715 prev_epoch = command_ctx.barrier_info.prev_epoch(),
716 ?creating_jobs_to_wait,
717 ?node_to_collect,
718 "enqueue command"
719 );
720 self.command_ctx_queue.insert(
721 command_ctx.barrier_info.prev_epoch(),
722 EpochNode {
723 enqueue_time: timer,
724 state: BarrierEpochState {
725 node_to_collect,
726 resps: vec![],
727 creating_jobs_to_wait,
728 finished_jobs: HashMap::new(),
729 },
730 command_ctx,
731 notifiers,
732 },
733 );
734 }
735
736 fn barrier_collected(
739 &mut self,
740 resp: BarrierCompleteResponse,
741 periodic_barriers: &mut PeriodicBarriers,
742 ) -> MetaResult<()> {
743 let worker_id = resp.worker_id;
744 let prev_epoch = resp.epoch;
745 tracing::trace!(
746 %worker_id,
747 prev_epoch,
748 partial_graph_id = %resp.partial_graph_id,
749 "barrier collected"
750 );
751 let (database_id, creating_job_id) = from_partial_graph_id(resp.partial_graph_id);
752 assert_eq!(self.database_id, database_id);
753 match creating_job_id {
754 None => {
755 if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
756 assert!(node.state.node_to_collect.remove(&worker_id));
757 node.state.resps.push(resp);
758 } else {
759 panic!(
760 "collect barrier on non-existing barrier: {}, {}",
761 prev_epoch, worker_id
762 );
763 }
764 }
765 Some(creating_job_id) => {
766 let should_merge_to_upstream = self
767 .creating_streaming_job_controls
768 .get_mut(&creating_job_id)
769 .expect("should exist")
770 .collect(resp);
771 if should_merge_to_upstream {
772 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
773 }
774 }
775 }
776 Ok(())
777 }
778
779 fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
781 self.command_ctx_queue
782 .values()
783 .filter(|x| x.state.is_inflight())
784 .count()
785 < in_flight_barrier_nums
786 }
787
788 pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
790 for epoch_node in self.command_ctx_queue.values() {
791 if !is_valid_after_worker_err(&epoch_node.state.node_to_collect, worker_id) {
792 return false;
793 }
794 }
795 true
796 }
797}
798
799impl DatabaseCheckpointControl {
800 fn collect_backfill_pinned_upstream_log_epoch(
802 &self,
803 ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
804 self.creating_streaming_job_controls
805 .iter()
806 .map(|(job_id, creating_job)| {
807 let progress_epoch = creating_job.pinned_upstream_log_epoch();
808 (
809 *job_id,
810 (
811 progress_epoch,
812 creating_job.snapshot_backfill_upstream_tables.clone(),
813 ),
814 )
815 })
816 .collect()
817 }
818
819 fn next_complete_barrier_task(
820 &mut self,
821 task: &mut Option<CompleteBarrierTask>,
822 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
823 hummock_version_stats: &HummockVersionStats,
824 ) {
825 let mut creating_jobs_task = vec![];
827 if let Some(committed_epoch) = self.committed_epoch {
828 let mut finished_jobs = Vec::new();
830 let min_upstream_inflight_barrier = self
831 .command_ctx_queue
832 .first_key_value()
833 .map(|(epoch, _)| *epoch);
834 for (job_id, job) in &mut self.creating_streaming_job_controls {
835 if let Some((epoch, resps, status)) =
836 job.start_completing(min_upstream_inflight_barrier, committed_epoch)
837 {
838 let create_info = match status {
839 CompleteJobType::First(info) => Some(info),
840 CompleteJobType::Normal => None,
841 CompleteJobType::Finished => {
842 finished_jobs.push((*job_id, epoch, resps));
843 continue;
844 }
845 };
846 creating_jobs_task.push((*job_id, epoch, resps, create_info));
847 }
848 }
849 if !finished_jobs.is_empty()
850 && let Some((_, control_stream_manager)) = &mut context
851 {
852 control_stream_manager.remove_partial_graph(
853 self.database_id,
854 finished_jobs.iter().map(|(job_id, _, _)| *job_id).collect(),
855 );
856 }
857 for (job_id, epoch, resps) in finished_jobs {
858 let epoch_state = &mut self
859 .command_ctx_queue
860 .get_mut(&epoch)
861 .expect("should exist")
862 .state;
863 assert!(epoch_state.creating_jobs_to_wait.remove(&job_id));
864 debug!(epoch, %job_id, "finish creating job");
865 let creating_streaming_job = self
868 .creating_streaming_job_controls
869 .remove(&job_id)
870 .expect("should exist");
871 let tracking_job = creating_streaming_job.into_tracking_job();
872
873 assert!(
874 epoch_state
875 .finished_jobs
876 .insert(job_id, (resps, tracking_job))
877 .is_none()
878 );
879 }
880 }
881 assert!(self.completing_barrier.is_none());
882 while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
883 && !state.is_inflight()
884 {
885 {
886 let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
887 assert!(node.state.creating_jobs_to_wait.is_empty());
888 assert!(node.state.node_to_collect.is_empty());
889
890 self.handle_refresh_table_info(task, &node);
891
892 self.database_info.apply_collected_command(
893 &node.command_ctx.command,
894 &node.state.resps,
895 hummock_version_stats,
896 );
897 if !node.command_ctx.barrier_info.kind.is_checkpoint() {
898 node.notifiers.into_iter().for_each(|notifier| {
899 notifier.notify_collected();
900 });
901 if let Some((periodic_barriers, _)) = &mut context
902 && self.database_info.has_pending_finished_jobs()
903 && self
904 .command_ctx_queue
905 .values()
906 .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
907 {
908 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
909 }
910 continue;
911 }
912 let mut staging_commit_info = self.database_info.take_staging_commit_info();
913 node.state
914 .finished_jobs
915 .drain()
916 .for_each(|(_job_id, (resps, tracking_job))| {
917 node.state.resps.extend(resps);
918 staging_commit_info.finished_jobs.push(tracking_job);
919 });
920 let task = task.get_or_insert_default();
921 node.command_ctx.collect_commit_epoch_info(
922 &mut task.commit_info,
923 take(&mut node.state.resps),
924 self.collect_backfill_pinned_upstream_log_epoch(),
925 );
926 self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
927 task.finished_jobs.extend(staging_commit_info.finished_jobs);
928 task.finished_cdc_table_backfill
929 .extend(staging_commit_info.finished_cdc_table_backfill);
930 task.notifiers.extend(node.notifiers);
931 task.epoch_infos
932 .try_insert(
933 self.database_id,
934 (Some((node.command_ctx, node.enqueue_time)), vec![]),
935 )
936 .expect("non duplicate");
937 task.commit_info
938 .truncate_tables
939 .extend(staging_commit_info.table_ids_to_truncate);
940 break;
941 }
942 }
943 if !creating_jobs_task.is_empty() {
944 let task = task.get_or_insert_default();
945 for (job_id, epoch, resps, create_info) in creating_jobs_task {
946 collect_creating_job_commit_epoch_info(
947 &mut task.commit_info,
948 epoch,
949 resps,
950 self.creating_streaming_job_controls[&job_id]
951 .state_table_ids()
952 .iter()
953 .copied(),
954 create_info.as_ref(),
955 );
956 let (_, creating_job_epochs) =
957 task.epoch_infos.entry(self.database_id).or_default();
958 creating_job_epochs.push((job_id, epoch, create_info));
959 }
960 }
961 }
962
963 fn ack_completed(
964 &mut self,
965 command_prev_epoch: Option<u64>,
966 creating_job_epochs: Vec<(JobId, u64)>,
967 ) {
968 {
969 if let Some(prev_epoch) = self.completing_barrier.take() {
970 assert_eq!(command_prev_epoch, Some(prev_epoch));
971 self.committed_epoch = Some(prev_epoch);
972 } else {
973 assert_eq!(command_prev_epoch, None);
974 };
975 for (job_id, epoch) in creating_job_epochs {
976 self.creating_streaming_job_controls
977 .get_mut(&job_id)
978 .expect("should exist")
979 .ack_completed(epoch)
980 }
981 }
982 }
983
984 fn handle_refresh_table_info(&self, task: &mut Option<CompleteBarrierTask>, node: &EpochNode) {
985 let list_finished_info = node
986 .state
987 .resps
988 .iter()
989 .flat_map(|resp| resp.list_finished_sources.clone())
990 .collect::<Vec<_>>();
991 if !list_finished_info.is_empty() {
992 let task = task.get_or_insert_default();
993 task.list_finished_source_ids.extend(list_finished_info);
994 }
995
996 let load_finished_info = node
997 .state
998 .resps
999 .iter()
1000 .flat_map(|resp| resp.load_finished_sources.clone())
1001 .collect::<Vec<_>>();
1002 if !load_finished_info.is_empty() {
1003 let task = task.get_or_insert_default();
1004 task.load_finished_source_ids.extend(load_finished_info);
1005 }
1006
1007 let refresh_finished_table_ids: Vec<JobId> = node
1008 .state
1009 .resps
1010 .iter()
1011 .flat_map(|resp| {
1012 resp.refresh_finished_tables
1013 .iter()
1014 .map(|table_id| table_id.as_job_id())
1015 })
1016 .collect::<Vec<_>>();
1017 if !refresh_finished_table_ids.is_empty() {
1018 let task = task.get_or_insert_default();
1019 task.refresh_finished_table_job_ids
1020 .extend(refresh_finished_table_ids);
1021 }
1022 }
1023}
1024
1025struct EpochNode {
1027 enqueue_time: HistogramTimer,
1029
1030 state: BarrierEpochState,
1032
1033 command_ctx: CommandContext,
1035 notifiers: Vec<Notifier>,
1037}
1038
1039#[derive(Debug)]
1040struct BarrierEpochState {
1042 node_to_collect: NodeToCollect,
1043
1044 resps: Vec<BarrierCompleteResponse>,
1045
1046 creating_jobs_to_wait: HashSet<JobId>,
1047
1048 finished_jobs: HashMap<JobId, (Vec<BarrierCompleteResponse>, TrackingJob)>,
1049}
1050
1051impl BarrierEpochState {
1052 fn is_inflight(&self) -> bool {
1053 !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
1054 }
1055}
1056
1057impl DatabaseCheckpointControl {
1058 fn handle_new_barrier(
1060 &mut self,
1061 command: Option<(Command, Vec<Notifier>)>,
1062 checkpoint: bool,
1063 span: tracing::Span,
1064 control_stream_manager: &mut ControlStreamManager,
1065 hummock_version_stats: &HummockVersionStats,
1066 ) -> MetaResult<()> {
1067 let curr_epoch = self.state.in_flight_prev_epoch().next();
1068
1069 let (command, mut notifiers) = if let Some((command, notifiers)) = command {
1070 (Some(command), notifiers)
1071 } else {
1072 (None, vec![])
1073 };
1074
1075 debug_assert!(
1076 !matches!(
1077 &command,
1078 Some(Command::RescheduleIntent {
1079 reschedule_plan: None,
1080 ..
1081 })
1082 ),
1083 "reschedule intent should be resolved before injection"
1084 );
1085
1086 if let Some(Command::DropStreamingJobs {
1087 streaming_job_ids, ..
1088 }) = &command
1089 {
1090 if streaming_job_ids.len() > 1 {
1091 for job_to_cancel in streaming_job_ids {
1092 if self
1093 .creating_streaming_job_controls
1094 .contains_key(job_to_cancel)
1095 {
1096 warn!(
1097 job_id = %job_to_cancel,
1098 "ignore multi-job cancel command on creating snapshot backfill streaming job"
1099 );
1100 for notifier in notifiers {
1101 notifier
1102 .notify_start_failed(anyhow!("cannot cancel creating snapshot backfill streaming job with other jobs, \
1103 the job will continue creating until created or recovery. Please cancel the snapshot backfilling job in a single DDL ").into());
1104 }
1105 return Ok(());
1106 }
1107 }
1108 } else if let Some(job_to_drop) = streaming_job_ids.iter().next()
1109 && let Some(creating_job) =
1110 self.creating_streaming_job_controls.get_mut(job_to_drop)
1111 && creating_job.drop(&mut notifiers, control_stream_manager)
1112 {
1113 return Ok(());
1114 }
1115 }
1116
1117 if let Some(Command::Throttle { jobs, .. }) = &command
1118 && jobs.len() > 1
1119 && let Some(creating_job_id) = jobs
1120 .iter()
1121 .find(|job| self.creating_streaming_job_controls.contains_key(*job))
1122 {
1123 warn!(
1124 job_id = %creating_job_id,
1125 "ignore multi-job throttle command on creating snapshot backfill streaming job"
1126 );
1127 for notifier in notifiers {
1128 notifier
1129 .notify_start_failed(anyhow!("cannot alter rate limit for snapshot backfill streaming job with other jobs, \
1130 the original rate limit will be kept recovery.").into());
1131 }
1132 return Ok(());
1133 };
1134
1135 if let Some(Command::RescheduleIntent { .. }) = &command
1136 && !self.creating_streaming_job_controls.is_empty()
1137 {
1138 warn!("ignore reschedule when creating streaming job with snapshot backfill");
1139 for notifier in notifiers {
1140 notifier.notify_start_failed(
1141 anyhow!(
1142 "cannot reschedule when creating streaming job with snapshot backfill",
1143 )
1144 .into(),
1145 );
1146 }
1147 return Ok(());
1148 }
1149
1150 if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1151 && self.database_info.is_empty()
1152 {
1153 assert!(
1154 self.creating_streaming_job_controls.is_empty(),
1155 "should not have snapshot backfill job when there is no normal job in database"
1156 );
1157 for mut notifier in notifiers {
1159 notifier.notify_started();
1160 notifier.notify_collected();
1161 }
1162 return Ok(());
1163 };
1164
1165 if let Some(Command::CreateStreamingJob {
1166 job_type: CreateStreamingJobType::SnapshotBackfill(_),
1167 ..
1168 }) = &command
1169 && self.state.is_paused()
1170 {
1171 warn!("cannot create streaming job with snapshot backfill when paused");
1172 for notifier in notifiers {
1173 notifier.notify_start_failed(
1174 anyhow!("cannot create streaming job with snapshot backfill when paused",)
1175 .into(),
1176 );
1177 }
1178 return Ok(());
1179 }
1180
1181 let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1182 barrier_info.prev_epoch.span().in_scope(|| {
1184 tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch(), "new barrier enqueued");
1185 });
1186 span.record("epoch", barrier_info.curr_epoch());
1187
1188 let ApplyCommandInfo {
1189 mv_subscription_max_retention,
1190 table_ids_to_commit,
1191 jobs_to_wait,
1192 node_to_collect,
1193 command,
1194 } = match self.apply_command(
1195 command,
1196 &mut notifiers,
1197 &barrier_info,
1198 control_stream_manager,
1199 hummock_version_stats,
1200 ) {
1201 Ok(info) => info,
1202 Err(err) => {
1203 for notifier in notifiers {
1204 notifier.notify_start_failed(err.clone());
1205 }
1206 fail_point!("inject_barrier_err_success");
1207 return Err(err);
1208 }
1209 };
1210
1211 notifiers.iter_mut().for_each(|n| n.notify_started());
1213
1214 let command_ctx = CommandContext::new(
1215 barrier_info,
1216 mv_subscription_max_retention,
1217 table_ids_to_commit,
1218 command,
1219 span,
1220 );
1221
1222 self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1224
1225 Ok(())
1226 }
1227}