1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
26use risingwave_meta_model::WorkerId;
27use risingwave_pb::ddl_service::DdlProgress;
28use risingwave_pb::hummock::HummockVersionStats;
29use risingwave_pb::stream_service::BarrierCompleteResponse;
30use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
31use thiserror_ext::AsReport;
32use tracing::{debug, warn};
33
34use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
35use crate::barrier::checkpoint::recovery::{
36 DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
37 RecoveringStateAction,
38};
39use crate::barrier::checkpoint::state::BarrierWorkerState;
40use crate::barrier::command::CommandContext;
41use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
42use crate::barrier::info::{InflightDatabaseInfo, InflightStreamingJobInfo};
43use crate::barrier::notifier::Notifier;
44use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob};
45use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
46use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
47use crate::barrier::utils::{
48 NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
49};
50use crate::barrier::{
51 BarrierKind, Command, CreateStreamingJobType, InflightSubscriptionInfo, TracedEpoch,
52};
53use crate::manager::MetaSrvEnv;
54use crate::rpc::metrics::GLOBAL_META_METRICS;
55use crate::stream::fill_snapshot_backfill_epoch;
56use crate::{MetaError, MetaResult};
57
58pub(crate) struct CheckpointControl {
59 pub(crate) env: MetaSrvEnv,
60 pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
61 pub(super) hummock_version_stats: HummockVersionStats,
62}
63
64impl CheckpointControl {
65 pub fn new(env: MetaSrvEnv) -> Self {
66 Self {
67 env,
68 databases: Default::default(),
69 hummock_version_stats: Default::default(),
70 }
71 }
72
73 pub(crate) fn recover(
74 databases: impl IntoIterator<Item = (DatabaseId, DatabaseCheckpointControl)>,
75 failed_databases: HashSet<DatabaseId>,
76 control_stream_manager: &mut ControlStreamManager,
77 hummock_version_stats: HummockVersionStats,
78 ) -> Self {
79 Self {
80 env: control_stream_manager.env.clone(),
81 databases: databases
82 .into_iter()
83 .map(|(database_id, control)| {
84 (
85 database_id,
86 DatabaseCheckpointControlStatus::Running(control),
87 )
88 })
89 .chain(failed_databases.into_iter().map(|database_id| {
90 (
91 database_id,
92 DatabaseCheckpointControlStatus::Recovering(
93 DatabaseRecoveringState::resetting(database_id, control_stream_manager),
94 ),
95 )
96 }))
97 .collect(),
98 hummock_version_stats,
99 }
100 }
101
102 pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
103 self.hummock_version_stats = output.hummock_version_stats;
104 for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
105 self.databases
106 .get_mut(&database_id)
107 .expect("should exist")
108 .expect_running("should have wait for completing command before enter recovery")
109 .ack_completed(command_prev_epoch, creating_job_epochs);
110 }
111 }
112
113 pub(crate) fn next_complete_barrier_task(
114 &mut self,
115 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
116 ) -> Option<CompleteBarrierTask> {
117 let mut task = None;
118 for database in self.databases.values_mut() {
119 let Some(database) = database.running_state_mut() else {
120 continue;
121 };
122 let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
123 database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
124 }
125 task
126 }
127
128 pub(crate) fn barrier_collected(
129 &mut self,
130 resp: BarrierCompleteResponse,
131 periodic_barriers: &mut PeriodicBarriers,
132 ) -> MetaResult<()> {
133 let database_id = DatabaseId::new(resp.database_id);
134 let database_status = self.databases.get_mut(&database_id).expect("should exist");
135 match database_status {
136 DatabaseCheckpointControlStatus::Running(database) => {
137 database.barrier_collected(resp, periodic_barriers)
138 }
139 DatabaseCheckpointControlStatus::Recovering(state) => {
140 state.barrier_collected(database_id, resp);
141 Ok(())
142 }
143 }
144 }
145
146 pub(crate) fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
147 self.databases.values().all(|database| {
148 database
149 .running_state()
150 .map(|database| database.can_inject_barrier(in_flight_barrier_nums))
151 .unwrap_or(true)
152 })
153 }
154
155 pub(crate) fn max_prev_epoch(&self) -> Option<TracedEpoch> {
156 self.databases
157 .values()
158 .flat_map(|database| {
159 database
160 .running_state()
161 .map(|database| database.state.in_flight_prev_epoch())
162 })
163 .max_by_key(|epoch| epoch.value())
164 .cloned()
165 }
166
167 pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
168 self.databases.iter().filter_map(|(database_id, database)| {
169 database.running_state().is_none().then_some(*database_id)
170 })
171 }
172
173 pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
174 self.databases.iter().filter_map(|(database_id, database)| {
175 database.running_state().is_some().then_some(*database_id)
176 })
177 }
178
179 pub(crate) fn handle_new_barrier(
181 &mut self,
182 new_barrier: NewBarrier,
183 control_stream_manager: &mut ControlStreamManager,
184 ) -> Option<HashMap<DatabaseId, MetaError>> {
185 let NewBarrier {
186 command,
187 span,
188 checkpoint,
189 } = new_barrier;
190
191 if let Some((database_id, mut command, notifiers)) = command {
192 if let &mut Command::CreateStreamingJob {
193 ref mut cross_db_snapshot_backfill_info,
194 ref info,
195 ..
196 } = &mut command
197 {
198 for (table_id, snapshot_epoch) in
199 &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
200 {
201 for database in self.databases.values() {
202 if let Some(database) = database.running_state()
203 && database.state.inflight_graph_info.contains_job(*table_id)
204 {
205 if let Some(committed_epoch) = database.committed_epoch {
206 *snapshot_epoch = Some(committed_epoch);
207 }
208 break;
209 }
210 }
211 if snapshot_epoch.is_none() {
212 let table_id = *table_id;
213 warn!(
214 ?cross_db_snapshot_backfill_info,
215 ?table_id,
216 ?info,
217 "database of cross db upstream table not found"
218 );
219 let err: MetaError =
220 anyhow!("database of cross db upstream table {} not found", table_id)
221 .into();
222 for notifier in notifiers {
223 notifier.notify_start_failed(err.clone());
224 }
225
226 return None;
227 }
228 }
229 }
230
231 let max_prev_epoch = self.max_prev_epoch();
232 let (database, max_prev_epoch) = match self.databases.entry(database_id) {
233 Entry::Occupied(entry) => (
234 entry
235 .into_mut()
236 .expect_running("should not have command when not running"),
237 max_prev_epoch.expect("should exist when having some database"),
238 ),
239 Entry::Vacant(entry) => match &command {
240 Command::CreateStreamingJob {
241 job_type: CreateStreamingJobType::Normal,
242 ..
243 } => {
244 let new_database = DatabaseCheckpointControl::new(database_id);
245 let max_prev_epoch = if let Some(max_prev_epoch) = max_prev_epoch {
246 if max_prev_epoch.value()
247 < new_database.state.in_flight_prev_epoch().value()
248 {
249 new_database.state.in_flight_prev_epoch().clone()
250 } else {
251 max_prev_epoch
252 }
253 } else {
254 new_database.state.in_flight_prev_epoch().clone()
255 };
256 control_stream_manager.add_partial_graph(database_id, None);
257 (
258 entry
259 .insert(DatabaseCheckpointControlStatus::Running(new_database))
260 .expect_running("just initialized as running"),
261 max_prev_epoch,
262 )
263 }
264 Command::Flush | Command::Pause | Command::Resume => {
265 for mut notifier in notifiers {
266 notifier.notify_started();
267 notifier.notify_collected();
268 }
269 warn!(?command, "skip command for empty database");
270 return None;
271 }
272 _ => {
273 panic!(
274 "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
275 database_id, command
276 )
277 }
278 },
279 };
280
281 let curr_epoch = max_prev_epoch.next();
282
283 let mut failed_databases: Option<HashMap<_, _>> = None;
284
285 if let Err(e) = database.handle_new_barrier(
286 Some((command, notifiers)),
287 checkpoint,
288 span.clone(),
289 control_stream_manager,
290 &self.hummock_version_stats,
291 curr_epoch.clone(),
292 ) {
293 failed_databases
294 .get_or_insert_default()
295 .try_insert(database_id, e)
296 .expect("non-duplicate");
297 }
298 for database in self.databases.values_mut() {
299 let Some(database) = database.running_state_mut() else {
300 continue;
301 };
302 if database.database_id == database_id {
303 continue;
304 }
305 if let Err(e) = database.handle_new_barrier(
306 None,
307 checkpoint,
308 span.clone(),
309 control_stream_manager,
310 &self.hummock_version_stats,
311 curr_epoch.clone(),
312 ) {
313 failed_databases
314 .get_or_insert_default()
315 .try_insert(database.database_id, e)
316 .expect("non-duplicate");
317 }
318 }
319 failed_databases
320 } else {
321 #[expect(clippy::question_mark)]
322 let Some(max_prev_epoch) = self.max_prev_epoch() else {
323 return None;
324 };
325 let curr_epoch = max_prev_epoch.next();
326
327 let mut failed_databases: Option<HashMap<_, _>> = None;
328 for database in self.databases.values_mut() {
329 let Some(database) = database.running_state_mut() else {
330 continue;
331 };
332 if let Err(e) = database.handle_new_barrier(
333 None,
334 checkpoint,
335 span.clone(),
336 control_stream_manager,
337 &self.hummock_version_stats,
338 curr_epoch.clone(),
339 ) {
340 failed_databases
341 .get_or_insert_default()
342 .try_insert(database.database_id, e)
343 .expect("non-duplicate");
344 }
345 }
346
347 failed_databases
348 }
349 }
350
351 pub(crate) fn update_barrier_nums_metrics(&self) {
352 self.databases
353 .values()
354 .flat_map(|database| database.running_state())
355 .for_each(|database| database.update_barrier_nums_metrics());
356 }
357
358 pub(crate) fn gen_ddl_progress(&self) -> HashMap<u32, DdlProgress> {
359 let mut progress = HashMap::new();
360 for status in self.databases.values() {
361 let Some(database_checkpoint_control) = status.running_state() else {
362 continue;
363 };
364 progress.extend(
366 database_checkpoint_control
367 .create_mview_tracker
368 .gen_ddl_progress(),
369 );
370 for creating_job in database_checkpoint_control
372 .creating_streaming_job_controls
373 .values()
374 {
375 progress.extend([(
376 creating_job.job_id.table_id,
377 creating_job.gen_ddl_progress(),
378 )]);
379 }
380 }
381 progress
382 }
383
384 pub(crate) fn databases_failed_at_worker_err(
385 &mut self,
386 worker_id: WorkerId,
387 ) -> Vec<DatabaseId> {
388 let mut failed_databases = Vec::new();
389 for (database_id, database_status) in &mut self.databases {
390 let database_checkpoint_control = match database_status {
391 DatabaseCheckpointControlStatus::Running(control) => control,
392 DatabaseCheckpointControlStatus::Recovering(state) => {
393 if !state.is_valid_after_worker_err(worker_id) {
394 failed_databases.push(*database_id);
395 }
396 continue;
397 }
398 };
399
400 if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
401 || database_checkpoint_control
402 .state
403 .inflight_graph_info
404 .contains_worker(worker_id as _)
405 || database_checkpoint_control
406 .creating_streaming_job_controls
407 .values_mut()
408 .any(|job| !job.is_valid_after_worker_err(worker_id))
409 {
410 failed_databases.push(*database_id);
411 }
412 }
413 failed_databases
414 }
415
416 pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
417 for (_, node) in self.databases.values_mut().flat_map(|status| {
418 status
419 .running_state_mut()
420 .map(|database| take(&mut database.command_ctx_queue))
421 .into_iter()
422 .flatten()
423 }) {
424 for notifier in node.notifiers {
425 notifier.notify_collection_failed(err.clone());
426 }
427 node.enqueue_time.observe_duration();
428 }
429 self.databases.values_mut().for_each(|database| {
430 if let Some(database) = database.running_state_mut() {
431 database.create_mview_tracker.abort_all()
432 }
433 });
434 }
435
436 pub(crate) fn inflight_infos(
437 &self,
438 ) -> impl Iterator<
439 Item = (
440 DatabaseId,
441 &InflightSubscriptionInfo,
442 &InflightDatabaseInfo,
443 impl Iterator<Item = &InflightStreamingJobInfo> + '_,
444 ),
445 > + '_ {
446 self.databases.iter().flat_map(|(database_id, database)| {
447 database
448 .database_state()
449 .map(|(database_state, creating_jobs)| {
450 (
451 *database_id,
452 &database_state.inflight_subscription_info,
453 &database_state.inflight_graph_info,
454 creating_jobs
455 .values()
456 .filter_map(|job| job.inflight_graph_info()),
457 )
458 })
459 })
460 }
461}
462
463pub(crate) enum CheckpointControlEvent<'a> {
464 EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
465 EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
466}
467
468impl CheckpointControl {
469 pub(crate) fn on_reset_database_resp(
470 &mut self,
471 worker_id: WorkerId,
472 resp: ResetDatabaseResponse,
473 ) {
474 let database_id = DatabaseId::new(resp.database_id);
475 match self.databases.get_mut(&database_id).expect("should exist") {
476 DatabaseCheckpointControlStatus::Running(_) => {
477 unreachable!("should not receive reset database resp when running")
478 }
479 DatabaseCheckpointControlStatus::Recovering(state) => {
480 state.on_reset_database_resp(worker_id, resp)
481 }
482 }
483 }
484
485 pub(crate) fn next_event(
486 &mut self,
487 ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
488 let mut this = Some(self);
489 poll_fn(move |cx| {
490 let Some(this_mut) = this.as_mut() else {
491 unreachable!("should not be polled after poll ready")
492 };
493 for (&database_id, database_status) in &mut this_mut.databases {
494 match database_status {
495 DatabaseCheckpointControlStatus::Running(_) => {}
496 DatabaseCheckpointControlStatus::Recovering(state) => {
497 let poll_result = state.poll_next_event(cx);
498 if let Poll::Ready(action) = poll_result {
499 let this = this.take().expect("checked Some");
500 return Poll::Ready(match action {
501 RecoveringStateAction::EnterInitializing(reset_workers) => {
502 CheckpointControlEvent::EnteringInitializing(
503 this.new_database_status_action(
504 database_id,
505 EnterInitializing(reset_workers),
506 ),
507 )
508 }
509 RecoveringStateAction::EnterRunning => {
510 CheckpointControlEvent::EnteringRunning(
511 this.new_database_status_action(database_id, EnterRunning),
512 )
513 }
514 });
515 }
516 }
517 }
518 }
519 Poll::Pending
520 })
521 }
522}
523
524pub(crate) enum DatabaseCheckpointControlStatus {
525 Running(DatabaseCheckpointControl),
526 Recovering(DatabaseRecoveringState),
527}
528
529impl DatabaseCheckpointControlStatus {
530 fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
531 match self {
532 DatabaseCheckpointControlStatus::Running(state) => Some(state),
533 DatabaseCheckpointControlStatus::Recovering(_) => None,
534 }
535 }
536
537 fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
538 match self {
539 DatabaseCheckpointControlStatus::Running(state) => Some(state),
540 DatabaseCheckpointControlStatus::Recovering(_) => None,
541 }
542 }
543
544 fn database_state(
545 &self,
546 ) -> Option<(
547 &BarrierWorkerState,
548 &HashMap<TableId, CreatingStreamingJobControl>,
549 )> {
550 match self {
551 DatabaseCheckpointControlStatus::Running(control) => {
552 Some((&control.state, &control.creating_streaming_job_controls))
553 }
554 DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
555 }
556 }
557
558 fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
559 match self {
560 DatabaseCheckpointControlStatus::Running(state) => state,
561 DatabaseCheckpointControlStatus::Recovering(_) => {
562 panic!("should be at running: {}", reason)
563 }
564 }
565 }
566}
567
568struct DatabaseCheckpointControlMetrics {
569 barrier_latency: LabelGuardedHistogram<1>,
570 in_flight_barrier_nums: LabelGuardedIntGauge<1>,
571 all_barrier_nums: LabelGuardedIntGauge<1>,
572}
573
574impl DatabaseCheckpointControlMetrics {
575 fn new(database_id: DatabaseId) -> Self {
576 let database_id_str = database_id.database_id.to_string();
577 let barrier_latency = GLOBAL_META_METRICS
578 .barrier_latency
579 .with_guarded_label_values(&[&database_id_str]);
580 let in_flight_barrier_nums = GLOBAL_META_METRICS
581 .in_flight_barrier_nums
582 .with_guarded_label_values(&[&database_id_str]);
583 let all_barrier_nums = GLOBAL_META_METRICS
584 .all_barrier_nums
585 .with_guarded_label_values(&[&database_id_str]);
586 Self {
587 barrier_latency,
588 in_flight_barrier_nums,
589 all_barrier_nums,
590 }
591 }
592}
593
594pub(crate) struct DatabaseCheckpointControl {
596 database_id: DatabaseId,
597 state: BarrierWorkerState,
598
599 command_ctx_queue: BTreeMap<u64, EpochNode>,
602 completing_barrier: Option<u64>,
605
606 committed_epoch: Option<u64>,
607 creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
608
609 create_mview_tracker: CreateMviewProgressTracker,
610
611 metrics: DatabaseCheckpointControlMetrics,
612}
613
614impl DatabaseCheckpointControl {
615 fn new(database_id: DatabaseId) -> Self {
616 Self {
617 database_id,
618 state: BarrierWorkerState::new(),
619 command_ctx_queue: Default::default(),
620 completing_barrier: None,
621 committed_epoch: None,
622 creating_streaming_job_controls: Default::default(),
623 create_mview_tracker: Default::default(),
624 metrics: DatabaseCheckpointControlMetrics::new(database_id),
625 }
626 }
627
628 pub(crate) fn recovery(
629 database_id: DatabaseId,
630 create_mview_tracker: CreateMviewProgressTracker,
631 state: BarrierWorkerState,
632 committed_epoch: u64,
633 ) -> Self {
634 Self {
635 database_id,
636 state,
637 command_ctx_queue: Default::default(),
638 completing_barrier: None,
639 committed_epoch: Some(committed_epoch),
640 creating_streaming_job_controls: Default::default(),
641 create_mview_tracker,
642 metrics: DatabaseCheckpointControlMetrics::new(database_id),
643 }
644 }
645
646 fn total_command_num(&self) -> usize {
647 self.command_ctx_queue.len()
648 + match &self.completing_barrier {
649 Some(_) => 1,
650 None => 0,
651 }
652 }
653
654 fn update_barrier_nums_metrics(&self) {
656 self.metrics.in_flight_barrier_nums.set(
657 self.command_ctx_queue
658 .values()
659 .filter(|x| x.state.is_inflight())
660 .count() as i64,
661 );
662 self.metrics
663 .all_barrier_nums
664 .set(self.total_command_num() as i64);
665 }
666
667 fn jobs_to_merge(
668 &self,
669 ) -> Option<HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>> {
670 let mut table_ids_to_merge = HashMap::new();
671
672 for (table_id, creating_streaming_job) in &self.creating_streaming_job_controls {
673 if let Some(graph_info) = creating_streaming_job.should_merge_to_upstream() {
674 table_ids_to_merge.insert(
675 *table_id,
676 (
677 creating_streaming_job
678 .snapshot_backfill_upstream_tables
679 .clone(),
680 graph_info.clone(),
681 ),
682 );
683 }
684 }
685 if table_ids_to_merge.is_empty() {
686 None
687 } else {
688 Some(table_ids_to_merge)
689 }
690 }
691
692 fn enqueue_command(
694 &mut self,
695 command_ctx: CommandContext,
696 notifiers: Vec<Notifier>,
697 node_to_collect: NodeToCollect,
698 creating_jobs_to_wait: HashSet<TableId>,
699 ) {
700 let timer = self.metrics.barrier_latency.start_timer();
701
702 if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
703 assert_eq!(
704 command_ctx.barrier_info.prev_epoch.value(),
705 node.command_ctx.barrier_info.curr_epoch.value()
706 );
707 }
708
709 tracing::trace!(
710 prev_epoch = command_ctx.barrier_info.prev_epoch(),
711 ?creating_jobs_to_wait,
712 ?node_to_collect,
713 "enqueue command"
714 );
715 self.command_ctx_queue.insert(
716 command_ctx.barrier_info.prev_epoch(),
717 EpochNode {
718 enqueue_time: timer,
719 state: BarrierEpochState {
720 node_to_collect,
721 resps: vec![],
722 creating_jobs_to_wait,
723 finished_jobs: HashMap::new(),
724 },
725 command_ctx,
726 notifiers,
727 },
728 );
729 }
730
731 fn barrier_collected(
734 &mut self,
735 resp: BarrierCompleteResponse,
736 periodic_barriers: &mut PeriodicBarriers,
737 ) -> MetaResult<()> {
738 let worker_id = resp.worker_id;
739 let prev_epoch = resp.epoch;
740 tracing::trace!(
741 worker_id,
742 prev_epoch,
743 partial_graph_id = resp.partial_graph_id,
744 "barrier collected"
745 );
746 let creating_job_id = from_partial_graph_id(resp.partial_graph_id);
747 match creating_job_id {
748 None => {
749 if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
750 assert!(
751 node.state
752 .node_to_collect
753 .remove(&(worker_id as _))
754 .is_some()
755 );
756 node.state.resps.push(resp);
757 } else {
758 panic!(
759 "collect barrier on non-existing barrier: {}, {}",
760 prev_epoch, worker_id
761 );
762 }
763 }
764 Some(creating_job_id) => {
765 let should_merge_to_upstream = self
766 .creating_streaming_job_controls
767 .get_mut(&creating_job_id)
768 .expect("should exist")
769 .collect(prev_epoch, worker_id as _, resp)?;
770 if should_merge_to_upstream {
771 periodic_barriers.force_checkpoint_in_next_barrier();
772 }
773 }
774 }
775 Ok(())
776 }
777
778 fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
780 self.command_ctx_queue
781 .values()
782 .filter(|x| x.state.is_inflight())
783 .count()
784 < in_flight_barrier_nums
785 }
786
787 pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
789 for epoch_node in self.command_ctx_queue.values_mut() {
790 if !is_valid_after_worker_err(&mut epoch_node.state.node_to_collect, worker_id) {
791 return false;
792 }
793 }
794 true
796 }
797}
798
799impl DatabaseCheckpointControl {
800 fn collect_backfill_pinned_upstream_log_epoch(
802 &self,
803 ) -> HashMap<TableId, (u64, HashSet<TableId>)> {
804 self.creating_streaming_job_controls
805 .iter()
806 .filter_map(|(table_id, creating_job)| {
807 creating_job
808 .pinned_upstream_log_epoch()
809 .map(|progress_epoch| {
810 (
811 *table_id,
812 (
813 progress_epoch,
814 creating_job.snapshot_backfill_upstream_tables.clone(),
815 ),
816 )
817 })
818 })
819 .collect()
820 }
821
822 fn next_complete_barrier_task(
823 &mut self,
824 task: &mut Option<CompleteBarrierTask>,
825 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
826 hummock_version_stats: &HummockVersionStats,
827 ) {
828 let mut creating_jobs_task = vec![];
830 {
831 let mut finished_jobs = Vec::new();
833 let min_upstream_inflight_barrier = self
834 .command_ctx_queue
835 .first_key_value()
836 .map(|(epoch, _)| *epoch);
837 for (table_id, job) in &mut self.creating_streaming_job_controls {
838 if let Some((epoch, resps, status)) =
839 job.start_completing(min_upstream_inflight_barrier)
840 {
841 let is_first_time = match status {
842 CompleteJobType::First => true,
843 CompleteJobType::Normal => false,
844 CompleteJobType::Finished => {
845 finished_jobs.push((*table_id, epoch, resps));
846 continue;
847 }
848 };
849 creating_jobs_task.push((*table_id, epoch, resps, is_first_time));
850 }
851 }
852 if !finished_jobs.is_empty()
853 && let Some((_, control_stream_manager)) = &mut context
854 {
855 control_stream_manager.remove_partial_graph(
856 self.database_id,
857 finished_jobs
858 .iter()
859 .map(|(table_id, _, _)| *table_id)
860 .collect(),
861 );
862 }
863 for (table_id, epoch, resps) in finished_jobs {
864 let epoch_state = &mut self
865 .command_ctx_queue
866 .get_mut(&epoch)
867 .expect("should exist")
868 .state;
869 assert!(epoch_state.creating_jobs_to_wait.remove(&table_id));
870 debug!(epoch, ?table_id, "finish creating job");
871 let creating_streaming_job = self
874 .creating_streaming_job_controls
875 .remove(&table_id)
876 .expect("should exist");
877 assert!(creating_streaming_job.is_finished());
878 assert!(epoch_state.finished_jobs.insert(table_id, resps).is_none());
879 }
880 }
881 assert!(self.completing_barrier.is_none());
882 while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
883 && !state.is_inflight()
884 {
885 {
886 let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
887 assert!(node.state.creating_jobs_to_wait.is_empty());
888 assert!(node.state.node_to_collect.is_empty());
889 let mut finished_jobs = self.create_mview_tracker.apply_collected_command(
890 node.command_ctx.command.as_ref(),
891 &node.command_ctx.barrier_info,
892 &node.state.resps,
893 hummock_version_stats,
894 );
895 if !node.command_ctx.barrier_info.kind.is_checkpoint() {
896 assert!(finished_jobs.is_empty());
897 node.notifiers.into_iter().for_each(|notifier| {
898 notifier.notify_collected();
899 });
900 if let Some((periodic_barriers, _)) = &mut context
901 && self.create_mview_tracker.has_pending_finished_jobs()
902 && self
903 .command_ctx_queue
904 .values()
905 .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
906 {
907 periodic_barriers.force_checkpoint_in_next_barrier();
908 }
909 continue;
910 }
911 node.state
912 .finished_jobs
913 .drain()
914 .for_each(|(job_id, resps)| {
915 node.state.resps.extend(resps);
916 finished_jobs.push(TrackingJob::New(TrackingCommand {
917 job_id,
918 replace_stream_job: None,
919 }));
920 });
921 let task = task.get_or_insert_default();
922 node.command_ctx.collect_commit_epoch_info(
923 &mut task.commit_info,
924 take(&mut node.state.resps),
925 self.collect_backfill_pinned_upstream_log_epoch(),
926 );
927 self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
928 task.finished_jobs.extend(finished_jobs);
929 task.notifiers.extend(node.notifiers);
930 task.epoch_infos
931 .try_insert(
932 self.database_id,
933 (Some((node.command_ctx, node.enqueue_time)), vec![]),
934 )
935 .expect("non duplicate");
936 break;
937 }
938 }
939 if !creating_jobs_task.is_empty() {
940 let task = task.get_or_insert_default();
941 for (table_id, epoch, resps, is_first_time) in creating_jobs_task {
942 collect_creating_job_commit_epoch_info(
943 &mut task.commit_info,
944 epoch,
945 resps,
946 self.creating_streaming_job_controls[&table_id].state_table_ids(),
947 is_first_time,
948 );
949 let (_, creating_job_epochs) =
950 task.epoch_infos.entry(self.database_id).or_default();
951 creating_job_epochs.push((table_id, epoch));
952 }
953 }
954 }
955
956 fn ack_completed(
957 &mut self,
958 command_prev_epoch: Option<u64>,
959 creating_job_epochs: Vec<(TableId, u64)>,
960 ) {
961 {
962 if let Some(prev_epoch) = self.completing_barrier.take() {
963 assert_eq!(command_prev_epoch, Some(prev_epoch));
964 self.committed_epoch = Some(prev_epoch);
965 } else {
966 assert_eq!(command_prev_epoch, None);
967 };
968 for (table_id, epoch) in creating_job_epochs {
969 self.creating_streaming_job_controls
970 .get_mut(&table_id)
971 .expect("should exist")
972 .ack_completed(epoch)
973 }
974 }
975 }
976}
977
978struct EpochNode {
980 enqueue_time: HistogramTimer,
982
983 state: BarrierEpochState,
985
986 command_ctx: CommandContext,
988 notifiers: Vec<Notifier>,
990}
991
992#[derive(Debug)]
993struct BarrierEpochState {
995 node_to_collect: NodeToCollect,
996
997 resps: Vec<BarrierCompleteResponse>,
998
999 creating_jobs_to_wait: HashSet<TableId>,
1000
1001 finished_jobs: HashMap<TableId, Vec<BarrierCompleteResponse>>,
1002}
1003
1004impl BarrierEpochState {
1005 fn is_inflight(&self) -> bool {
1006 !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
1007 }
1008}
1009
1010impl DatabaseCheckpointControl {
1011 fn handle_new_barrier(
1013 &mut self,
1014 command: Option<(Command, Vec<Notifier>)>,
1015 checkpoint: bool,
1016 span: tracing::Span,
1017 control_stream_manager: &mut ControlStreamManager,
1018 hummock_version_stats: &HummockVersionStats,
1019 curr_epoch: TracedEpoch,
1020 ) -> MetaResult<()> {
1021 let (mut command, mut notifiers) = if let Some((command, notifiers)) = command {
1022 (Some(command), notifiers)
1023 } else {
1024 (None, vec![])
1025 };
1026
1027 for table_to_cancel in command
1028 .as_ref()
1029 .map(Command::tables_to_drop)
1030 .into_iter()
1031 .flatten()
1032 {
1033 if self
1034 .creating_streaming_job_controls
1035 .contains_key(&table_to_cancel)
1036 {
1037 warn!(
1038 table_id = table_to_cancel.table_id,
1039 "ignore cancel command on creating streaming job"
1040 );
1041 for notifier in notifiers {
1042 notifier
1043 .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
1044 }
1045 return Ok(());
1046 }
1047 }
1048
1049 if let Some(Command::RescheduleFragment { .. }) = &command {
1050 if !self.creating_streaming_job_controls.is_empty() {
1051 warn!("ignore reschedule when creating streaming job with snapshot backfill");
1052 for notifier in notifiers {
1053 notifier.notify_start_failed(
1054 anyhow!(
1055 "cannot reschedule when creating streaming job with snapshot backfill",
1056 )
1057 .into(),
1058 );
1059 }
1060 return Ok(());
1061 }
1062 }
1063
1064 let Some(barrier_info) =
1065 self.state
1066 .next_barrier_info(command.as_ref(), checkpoint, curr_epoch)
1067 else {
1068 for mut notifier in notifiers {
1070 notifier.notify_started();
1071 notifier.notify_collected();
1072 }
1073 return Ok(());
1074 };
1075
1076 let mut edges = self.state.inflight_graph_info.build_edge(command.as_ref());
1077
1078 if let Some(Command::CreateStreamingJob {
1080 job_type,
1081 info,
1082 cross_db_snapshot_backfill_info,
1083 }) = &mut command
1084 {
1085 match job_type {
1086 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
1087 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1088 fill_snapshot_backfill_epoch(
1089 &mut fragment.nodes,
1090 None,
1091 cross_db_snapshot_backfill_info,
1092 )?;
1093 }
1094 }
1095 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
1096 if self.state.is_paused() {
1097 warn!("cannot create streaming job with snapshot backfill when paused");
1098 for notifier in notifiers {
1099 notifier.notify_start_failed(
1100 anyhow!("cannot create streaming job with snapshot backfill when paused",)
1101 .into(),
1102 );
1103 }
1104 return Ok(());
1105 }
1106 for snapshot_backfill_epoch in snapshot_backfill_info
1108 .upstream_mv_table_id_to_backfill_epoch
1109 .values_mut()
1110 {
1111 assert_eq!(
1112 snapshot_backfill_epoch.replace(barrier_info.prev_epoch()),
1113 None,
1114 "must not set previously"
1115 );
1116 }
1117 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1118 if let Err(e) = fill_snapshot_backfill_epoch(
1119 &mut fragment.nodes,
1120 Some(snapshot_backfill_info),
1121 cross_db_snapshot_backfill_info,
1122 ) {
1123 warn!(e = %e.as_report(), "failed to fill snapshot backfill epoch");
1124 for notifier in notifiers {
1125 notifier.notify_start_failed(e.clone());
1126 }
1127 return Ok(());
1128 };
1129 }
1130 let job_id = info.stream_job_fragments.stream_job_id();
1131 let snapshot_backfill_upstream_tables = snapshot_backfill_info
1132 .upstream_mv_table_id_to_backfill_epoch
1133 .keys()
1134 .cloned()
1135 .collect();
1136
1137 self.creating_streaming_job_controls.insert(
1138 job_id,
1139 CreatingStreamingJobControl::new(
1140 info,
1141 snapshot_backfill_upstream_tables,
1142 barrier_info.prev_epoch(),
1143 hummock_version_stats,
1144 control_stream_manager,
1145 edges.as_mut().expect("should exist"),
1146 )?,
1147 );
1148 }
1149 }
1150 }
1151
1152 if let (BarrierKind::Checkpoint(_), None) = (&barrier_info.kind, &command)
1154 && let Some(jobs_to_merge) = self.jobs_to_merge()
1155 {
1156 command = Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge));
1157 }
1158
1159 let command = command;
1160
1161 let (
1162 pre_applied_graph_info,
1163 pre_applied_subscription_info,
1164 table_ids_to_commit,
1165 jobs_to_wait,
1166 prev_paused_reason,
1167 ) = self.state.apply_command(command.as_ref());
1168
1169 barrier_info.prev_epoch.span().in_scope(|| {
1171 tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch.value().0, "new barrier enqueued");
1172 });
1173 span.record("epoch", barrier_info.curr_epoch.value().0);
1174
1175 for creating_job in &mut self.creating_streaming_job_controls.values_mut() {
1176 creating_job.on_new_command(control_stream_manager, command.as_ref(), &barrier_info)?;
1177 }
1178
1179 let node_to_collect = match control_stream_manager.inject_command_ctx_barrier(
1180 self.database_id,
1181 command.as_ref(),
1182 &barrier_info,
1183 prev_paused_reason,
1184 &pre_applied_graph_info,
1185 &self.state.inflight_graph_info,
1186 &mut edges,
1187 ) {
1188 Ok(node_to_collect) => node_to_collect,
1189 Err(err) => {
1190 for notifier in notifiers {
1191 notifier.notify_start_failed(err.clone());
1192 }
1193 fail_point!("inject_barrier_err_success");
1194 return Err(err);
1195 }
1196 };
1197
1198 notifiers.iter_mut().for_each(|n| n.notify_started());
1200
1201 let command_ctx = CommandContext::new(
1202 barrier_info,
1203 pre_applied_subscription_info,
1204 table_ids_to_commit.clone(),
1205 command,
1206 span,
1207 );
1208
1209 self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1211
1212 Ok(())
1213 }
1214}