1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::mem::take;
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use prometheus::HistogramTimer;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
26use risingwave_meta_model::WorkerId;
27use risingwave_pb::ddl_service::DdlProgress;
28use risingwave_pb::hummock::HummockVersionStats;
29use risingwave_pb::stream_service::BarrierCompleteResponse;
30use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
31use thiserror_ext::AsReport;
32use tracing::{debug, warn};
33
34use crate::barrier::checkpoint::creating_job::{CompleteJobType, CreatingStreamingJobControl};
35use crate::barrier::checkpoint::recovery::{
36 DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
37 RecoveringStateAction,
38};
39use crate::barrier::checkpoint::state::BarrierWorkerState;
40use crate::barrier::command::CommandContext;
41use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
42use crate::barrier::info::{InflightStreamingJobInfo, SharedActorInfos};
43use crate::barrier::notifier::Notifier;
44use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob};
45use crate::barrier::rpc::{ControlStreamManager, from_partial_graph_id};
46use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
47use crate::barrier::utils::{
48 NodeToCollect, collect_creating_job_commit_epoch_info, is_valid_after_worker_err,
49};
50use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, InflightSubscriptionInfo};
51use crate::manager::MetaSrvEnv;
52use crate::rpc::metrics::GLOBAL_META_METRICS;
53use crate::stream::fill_snapshot_backfill_epoch;
54use crate::{MetaError, MetaResult};
55
56pub(crate) struct CheckpointControl {
57 pub(crate) env: MetaSrvEnv,
58 pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
59 pub(super) hummock_version_stats: HummockVersionStats,
60}
61
62impl CheckpointControl {
63 pub fn new(env: MetaSrvEnv) -> Self {
64 Self {
65 env,
66 databases: Default::default(),
67 hummock_version_stats: Default::default(),
68 }
69 }
70
71 pub(crate) fn recover(
72 databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
73 failed_databases: HashSet<DatabaseId>,
74 control_stream_manager: &mut ControlStreamManager,
75 hummock_version_stats: HummockVersionStats,
76 env: MetaSrvEnv,
77 ) -> Self {
78 env.shared_actor_infos()
79 .retain_databases(databases.keys().chain(&failed_databases).cloned());
80 Self {
81 env,
82 databases: databases
83 .into_iter()
84 .map(|(database_id, control)| {
85 (
86 database_id,
87 DatabaseCheckpointControlStatus::Running(control),
88 )
89 })
90 .chain(failed_databases.into_iter().map(|database_id| {
91 (
92 database_id,
93 DatabaseCheckpointControlStatus::Recovering(
94 DatabaseRecoveringState::resetting(database_id, control_stream_manager),
95 ),
96 )
97 }))
98 .collect(),
99 hummock_version_stats,
100 }
101 }
102
103 pub(crate) fn ack_completed(&mut self, output: BarrierCompleteOutput) {
104 self.hummock_version_stats = output.hummock_version_stats;
105 for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
106 self.databases
107 .get_mut(&database_id)
108 .expect("should exist")
109 .expect_running("should have wait for completing command before enter recovery")
110 .ack_completed(command_prev_epoch, creating_job_epochs);
111 }
112 }
113
114 pub(crate) fn next_complete_barrier_task(
115 &mut self,
116 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
117 ) -> Option<CompleteBarrierTask> {
118 let mut task = None;
119 for database in self.databases.values_mut() {
120 let Some(database) = database.running_state_mut() else {
121 continue;
122 };
123 let context = context.as_mut().map(|(s, c)| (&mut **s, &mut **c));
124 database.next_complete_barrier_task(&mut task, context, &self.hummock_version_stats);
125 }
126 task
127 }
128
129 pub(crate) fn barrier_collected(
130 &mut self,
131 resp: BarrierCompleteResponse,
132 periodic_barriers: &mut PeriodicBarriers,
133 ) -> MetaResult<()> {
134 let database_id = DatabaseId::new(resp.database_id);
135 let database_status = self.databases.get_mut(&database_id).expect("should exist");
136 match database_status {
137 DatabaseCheckpointControlStatus::Running(database) => {
138 database.barrier_collected(resp, periodic_barriers)
139 }
140 DatabaseCheckpointControlStatus::Recovering(state) => {
141 state.barrier_collected(database_id, resp);
142 Ok(())
143 }
144 }
145 }
146
147 pub(crate) fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
148 self.databases.values().all(|database| {
149 database
150 .running_state()
151 .map(|database| database.can_inject_barrier(in_flight_barrier_nums))
152 .unwrap_or(true)
153 })
154 }
155
156 pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
157 self.databases.iter().filter_map(|(database_id, database)| {
158 database.running_state().is_none().then_some(*database_id)
159 })
160 }
161
162 pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
163 self.databases.iter().filter_map(|(database_id, database)| {
164 database.running_state().is_some().then_some(*database_id)
165 })
166 }
167
168 pub(crate) fn handle_new_barrier(
170 &mut self,
171 new_barrier: NewBarrier,
172 control_stream_manager: &mut ControlStreamManager,
173 ) -> MetaResult<()> {
174 let NewBarrier {
175 database_id,
176 command,
177 span,
178 checkpoint,
179 } = new_barrier;
180
181 if let Some((mut command, notifiers)) = command {
182 if let &mut Command::CreateStreamingJob {
183 ref mut cross_db_snapshot_backfill_info,
184 ref info,
185 ..
186 } = &mut command
187 {
188 for (table_id, snapshot_epoch) in
189 &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
190 {
191 for database in self.databases.values() {
192 if let Some(database) = database.running_state()
193 && database.state.inflight_graph_info.contains_job(*table_id)
194 {
195 if let Some(committed_epoch) = database.committed_epoch {
196 *snapshot_epoch = Some(committed_epoch);
197 }
198 break;
199 }
200 }
201 if snapshot_epoch.is_none() {
202 let table_id = *table_id;
203 warn!(
204 ?cross_db_snapshot_backfill_info,
205 ?table_id,
206 ?info,
207 "database of cross db upstream table not found"
208 );
209 let err: MetaError =
210 anyhow!("database of cross db upstream table {} not found", table_id)
211 .into();
212 for notifier in notifiers {
213 notifier.notify_start_failed(err.clone());
214 }
215
216 return Ok(());
217 }
218 }
219 }
220
221 let database = match self.databases.entry(database_id) {
222 Entry::Occupied(entry) => entry
223 .into_mut()
224 .expect_running("should not have command when not running"),
225 Entry::Vacant(entry) => match &command {
226 Command::CreateStreamingJob {
227 job_type: CreateStreamingJobType::Normal,
228 ..
229 } => {
230 let new_database = DatabaseCheckpointControl::new(
231 database_id,
232 self.env.shared_actor_infos().clone(),
233 );
234 control_stream_manager.add_partial_graph(database_id, None);
235 entry
236 .insert(DatabaseCheckpointControlStatus::Running(new_database))
237 .expect_running("just initialized as running")
238 }
239 Command::Flush | Command::Pause | Command::Resume => {
240 for mut notifier in notifiers {
241 notifier.notify_started();
242 notifier.notify_collected();
243 }
244 warn!(?command, "skip command for empty database");
245 return Ok(());
246 }
247 _ => {
248 panic!(
249 "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
250 database_id, command
251 )
252 }
253 },
254 };
255
256 database.handle_new_barrier(
257 Some((command, notifiers)),
258 checkpoint,
259 span.clone(),
260 control_stream_manager,
261 &self.hummock_version_stats,
262 )
263 } else {
264 let database = match self.databases.entry(database_id) {
265 Entry::Occupied(entry) => entry.into_mut(),
266 Entry::Vacant(_) => {
267 return Ok(());
270 }
271 };
272 let Some(database) = database.running_state_mut() else {
273 return Ok(());
275 };
276 database.handle_new_barrier(
277 None,
278 checkpoint,
279 span.clone(),
280 control_stream_manager,
281 &self.hummock_version_stats,
282 )
283 }
284 }
285
286 pub(crate) fn update_barrier_nums_metrics(&self) {
287 self.databases
288 .values()
289 .flat_map(|database| database.running_state())
290 .for_each(|database| database.update_barrier_nums_metrics());
291 }
292
293 pub(crate) fn gen_ddl_progress(&self) -> HashMap<u32, DdlProgress> {
294 let mut progress = HashMap::new();
295 for status in self.databases.values() {
296 let Some(database_checkpoint_control) = status.running_state() else {
297 continue;
298 };
299 progress.extend(
301 database_checkpoint_control
302 .create_mview_tracker
303 .gen_ddl_progress(),
304 );
305 for creating_job in database_checkpoint_control
307 .creating_streaming_job_controls
308 .values()
309 {
310 progress.extend([(
311 creating_job.job_id.table_id,
312 creating_job.gen_ddl_progress(),
313 )]);
314 }
315 }
316 progress
317 }
318
319 pub(crate) fn databases_failed_at_worker_err(
320 &mut self,
321 worker_id: WorkerId,
322 ) -> Vec<DatabaseId> {
323 let mut failed_databases = Vec::new();
324 for (database_id, database_status) in &mut self.databases {
325 let database_checkpoint_control = match database_status {
326 DatabaseCheckpointControlStatus::Running(control) => control,
327 DatabaseCheckpointControlStatus::Recovering(state) => {
328 if !state.is_valid_after_worker_err(worker_id) {
329 failed_databases.push(*database_id);
330 }
331 continue;
332 }
333 };
334
335 if !database_checkpoint_control.is_valid_after_worker_err(worker_id as _)
336 || database_checkpoint_control
337 .state
338 .inflight_graph_info
339 .contains_worker(worker_id as _)
340 || database_checkpoint_control
341 .creating_streaming_job_controls
342 .values_mut()
343 .any(|job| !job.is_valid_after_worker_err(worker_id))
344 {
345 failed_databases.push(*database_id);
346 }
347 }
348 failed_databases
349 }
350
351 pub(crate) fn clear_on_err(&mut self, err: &MetaError) {
352 for (_, node) in self.databases.values_mut().flat_map(|status| {
353 status
354 .running_state_mut()
355 .map(|database| take(&mut database.command_ctx_queue))
356 .into_iter()
357 .flatten()
358 }) {
359 for notifier in node.notifiers {
360 notifier.notify_collection_failed(err.clone());
361 }
362 node.enqueue_time.observe_duration();
363 }
364 self.databases.values_mut().for_each(|database| {
365 if let Some(database) = database.running_state_mut() {
366 database.create_mview_tracker.abort_all()
367 }
368 });
369 }
370
371 pub(crate) fn inflight_infos(
372 &self,
373 ) -> impl Iterator<
374 Item = (
375 DatabaseId,
376 &InflightSubscriptionInfo,
377 impl Iterator<Item = TableId> + '_,
378 ),
379 > + '_ {
380 self.databases.iter().flat_map(|(database_id, database)| {
381 database
382 .database_state()
383 .map(|(database_state, creating_jobs)| {
384 (
385 *database_id,
386 &database_state.inflight_subscription_info,
387 creating_jobs
388 .values()
389 .filter_map(|job| job.is_consuming().then_some(job.job_id)),
390 )
391 })
392 })
393 }
394}
395
396pub(crate) enum CheckpointControlEvent<'a> {
397 EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
398 EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
399}
400
401impl CheckpointControl {
402 pub(crate) fn on_reset_database_resp(
403 &mut self,
404 worker_id: WorkerId,
405 resp: ResetDatabaseResponse,
406 ) {
407 let database_id = DatabaseId::new(resp.database_id);
408 match self.databases.get_mut(&database_id).expect("should exist") {
409 DatabaseCheckpointControlStatus::Running(_) => {
410 unreachable!("should not receive reset database resp when running")
411 }
412 DatabaseCheckpointControlStatus::Recovering(state) => {
413 state.on_reset_database_resp(worker_id, resp)
414 }
415 }
416 }
417
418 pub(crate) fn next_event(
419 &mut self,
420 ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
421 let mut this = Some(self);
422 poll_fn(move |cx| {
423 let Some(this_mut) = this.as_mut() else {
424 unreachable!("should not be polled after poll ready")
425 };
426 for (&database_id, database_status) in &mut this_mut.databases {
427 match database_status {
428 DatabaseCheckpointControlStatus::Running(_) => {}
429 DatabaseCheckpointControlStatus::Recovering(state) => {
430 let poll_result = state.poll_next_event(cx);
431 if let Poll::Ready(action) = poll_result {
432 let this = this.take().expect("checked Some");
433 return Poll::Ready(match action {
434 RecoveringStateAction::EnterInitializing(reset_workers) => {
435 CheckpointControlEvent::EnteringInitializing(
436 this.new_database_status_action(
437 database_id,
438 EnterInitializing(reset_workers),
439 ),
440 )
441 }
442 RecoveringStateAction::EnterRunning => {
443 CheckpointControlEvent::EnteringRunning(
444 this.new_database_status_action(database_id, EnterRunning),
445 )
446 }
447 });
448 }
449 }
450 }
451 }
452 Poll::Pending
453 })
454 }
455}
456
457pub(crate) enum DatabaseCheckpointControlStatus {
458 Running(DatabaseCheckpointControl),
459 Recovering(DatabaseRecoveringState),
460}
461
462impl DatabaseCheckpointControlStatus {
463 fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
464 match self {
465 DatabaseCheckpointControlStatus::Running(state) => Some(state),
466 DatabaseCheckpointControlStatus::Recovering(_) => None,
467 }
468 }
469
470 fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
471 match self {
472 DatabaseCheckpointControlStatus::Running(state) => Some(state),
473 DatabaseCheckpointControlStatus::Recovering(_) => None,
474 }
475 }
476
477 fn database_state(
478 &self,
479 ) -> Option<(
480 &BarrierWorkerState,
481 &HashMap<TableId, CreatingStreamingJobControl>,
482 )> {
483 match self {
484 DatabaseCheckpointControlStatus::Running(control) => {
485 Some((&control.state, &control.creating_streaming_job_controls))
486 }
487 DatabaseCheckpointControlStatus::Recovering(state) => state.database_state(),
488 }
489 }
490
491 fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
492 match self {
493 DatabaseCheckpointControlStatus::Running(state) => state,
494 DatabaseCheckpointControlStatus::Recovering(_) => {
495 panic!("should be at running: {}", reason)
496 }
497 }
498 }
499}
500
501struct DatabaseCheckpointControlMetrics {
502 barrier_latency: LabelGuardedHistogram,
503 in_flight_barrier_nums: LabelGuardedIntGauge,
504 all_barrier_nums: LabelGuardedIntGauge,
505}
506
507impl DatabaseCheckpointControlMetrics {
508 fn new(database_id: DatabaseId) -> Self {
509 let database_id_str = database_id.database_id.to_string();
510 let barrier_latency = GLOBAL_META_METRICS
511 .barrier_latency
512 .with_guarded_label_values(&[&database_id_str]);
513 let in_flight_barrier_nums = GLOBAL_META_METRICS
514 .in_flight_barrier_nums
515 .with_guarded_label_values(&[&database_id_str]);
516 let all_barrier_nums = GLOBAL_META_METRICS
517 .all_barrier_nums
518 .with_guarded_label_values(&[&database_id_str]);
519 Self {
520 barrier_latency,
521 in_flight_barrier_nums,
522 all_barrier_nums,
523 }
524 }
525}
526
527pub(crate) struct DatabaseCheckpointControl {
529 database_id: DatabaseId,
530 state: BarrierWorkerState,
531
532 command_ctx_queue: BTreeMap<u64, EpochNode>,
535 completing_barrier: Option<u64>,
538
539 committed_epoch: Option<u64>,
540 creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
541
542 create_mview_tracker: CreateMviewProgressTracker,
543
544 metrics: DatabaseCheckpointControlMetrics,
545}
546
547impl DatabaseCheckpointControl {
548 fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
549 Self {
550 database_id,
551 state: BarrierWorkerState::new(database_id, shared_actor_infos),
552 command_ctx_queue: Default::default(),
553 completing_barrier: None,
554 committed_epoch: None,
555 creating_streaming_job_controls: Default::default(),
556 create_mview_tracker: Default::default(),
557 metrics: DatabaseCheckpointControlMetrics::new(database_id),
558 }
559 }
560
561 pub(crate) fn recovery(
562 database_id: DatabaseId,
563 create_mview_tracker: CreateMviewProgressTracker,
564 state: BarrierWorkerState,
565 committed_epoch: u64,
566 creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
567 ) -> Self {
568 Self {
569 database_id,
570 state,
571 command_ctx_queue: Default::default(),
572 completing_barrier: None,
573 committed_epoch: Some(committed_epoch),
574 creating_streaming_job_controls,
575 create_mview_tracker,
576 metrics: DatabaseCheckpointControlMetrics::new(database_id),
577 }
578 }
579
580 fn total_command_num(&self) -> usize {
581 self.command_ctx_queue.len()
582 + match &self.completing_barrier {
583 Some(_) => 1,
584 None => 0,
585 }
586 }
587
588 fn update_barrier_nums_metrics(&self) {
590 self.metrics.in_flight_barrier_nums.set(
591 self.command_ctx_queue
592 .values()
593 .filter(|x| x.state.is_inflight())
594 .count() as i64,
595 );
596 self.metrics
597 .all_barrier_nums
598 .set(self.total_command_num() as i64);
599 }
600
601 fn jobs_to_merge(
602 &self,
603 ) -> Option<HashMap<TableId, (HashSet<TableId>, InflightStreamingJobInfo)>> {
604 let mut table_ids_to_merge = HashMap::new();
605
606 for (table_id, creating_streaming_job) in &self.creating_streaming_job_controls {
607 if let Some(graph_info) = creating_streaming_job.should_merge_to_upstream() {
608 table_ids_to_merge.insert(
609 *table_id,
610 (
611 creating_streaming_job
612 .snapshot_backfill_upstream_tables
613 .clone(),
614 graph_info.clone(),
615 ),
616 );
617 }
618 }
619 if table_ids_to_merge.is_empty() {
620 None
621 } else {
622 Some(table_ids_to_merge)
623 }
624 }
625
626 fn enqueue_command(
628 &mut self,
629 command_ctx: CommandContext,
630 notifiers: Vec<Notifier>,
631 node_to_collect: NodeToCollect,
632 creating_jobs_to_wait: HashSet<TableId>,
633 ) {
634 let timer = self.metrics.barrier_latency.start_timer();
635
636 if let Some((_, node)) = self.command_ctx_queue.last_key_value() {
637 assert_eq!(
638 command_ctx.barrier_info.prev_epoch.value(),
639 node.command_ctx.barrier_info.curr_epoch.value()
640 );
641 }
642
643 tracing::trace!(
644 prev_epoch = command_ctx.barrier_info.prev_epoch(),
645 ?creating_jobs_to_wait,
646 ?node_to_collect,
647 "enqueue command"
648 );
649 self.command_ctx_queue.insert(
650 command_ctx.barrier_info.prev_epoch(),
651 EpochNode {
652 enqueue_time: timer,
653 state: BarrierEpochState {
654 node_to_collect,
655 resps: vec![],
656 creating_jobs_to_wait,
657 finished_jobs: HashMap::new(),
658 },
659 command_ctx,
660 notifiers,
661 },
662 );
663 }
664
665 fn barrier_collected(
668 &mut self,
669 resp: BarrierCompleteResponse,
670 periodic_barriers: &mut PeriodicBarriers,
671 ) -> MetaResult<()> {
672 let worker_id = resp.worker_id;
673 let prev_epoch = resp.epoch;
674 tracing::trace!(
675 worker_id,
676 prev_epoch,
677 partial_graph_id = resp.partial_graph_id,
678 "barrier collected"
679 );
680 let creating_job_id = from_partial_graph_id(resp.partial_graph_id);
681 match creating_job_id {
682 None => {
683 if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) {
684 assert!(
685 node.state
686 .node_to_collect
687 .remove(&(worker_id as _))
688 .is_some()
689 );
690 node.state.resps.push(resp);
691 } else {
692 panic!(
693 "collect barrier on non-existing barrier: {}, {}",
694 prev_epoch, worker_id
695 );
696 }
697 }
698 Some(creating_job_id) => {
699 let should_merge_to_upstream = self
700 .creating_streaming_job_controls
701 .get_mut(&creating_job_id)
702 .expect("should exist")
703 .collect(resp);
704 if should_merge_to_upstream {
705 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
706 }
707 }
708 }
709 Ok(())
710 }
711
712 fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
714 self.command_ctx_queue
715 .values()
716 .filter(|x| x.state.is_inflight())
717 .count()
718 < in_flight_barrier_nums
719 }
720
721 pub(crate) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
723 for epoch_node in self.command_ctx_queue.values_mut() {
724 if !is_valid_after_worker_err(&mut epoch_node.state.node_to_collect, worker_id) {
725 return false;
726 }
727 }
728 true
730 }
731}
732
733impl DatabaseCheckpointControl {
734 fn collect_backfill_pinned_upstream_log_epoch(
736 &self,
737 ) -> HashMap<TableId, (u64, HashSet<TableId>)> {
738 self.creating_streaming_job_controls
739 .iter()
740 .filter_map(|(table_id, creating_job)| {
741 creating_job
742 .pinned_upstream_log_epoch()
743 .map(|progress_epoch| {
744 (
745 *table_id,
746 (
747 progress_epoch,
748 creating_job.snapshot_backfill_upstream_tables.clone(),
749 ),
750 )
751 })
752 })
753 .collect()
754 }
755
756 fn next_complete_barrier_task(
757 &mut self,
758 task: &mut Option<CompleteBarrierTask>,
759 mut context: Option<(&mut PeriodicBarriers, &mut ControlStreamManager)>,
760 hummock_version_stats: &HummockVersionStats,
761 ) {
762 let mut creating_jobs_task = vec![];
764 {
765 let mut finished_jobs = Vec::new();
767 let min_upstream_inflight_barrier = self
768 .command_ctx_queue
769 .first_key_value()
770 .map(|(epoch, _)| *epoch);
771 for (table_id, job) in &mut self.creating_streaming_job_controls {
772 if let Some((epoch, resps, status)) =
773 job.start_completing(min_upstream_inflight_barrier)
774 {
775 let is_first_time = match status {
776 CompleteJobType::First => true,
777 CompleteJobType::Normal => false,
778 CompleteJobType::Finished => {
779 finished_jobs.push((*table_id, epoch, resps));
780 continue;
781 }
782 };
783 creating_jobs_task.push((*table_id, epoch, resps, is_first_time));
784 }
785 }
786 if !finished_jobs.is_empty()
787 && let Some((_, control_stream_manager)) = &mut context
788 {
789 control_stream_manager.remove_partial_graph(
790 self.database_id,
791 finished_jobs
792 .iter()
793 .map(|(table_id, _, _)| *table_id)
794 .collect(),
795 );
796 }
797 for (table_id, epoch, resps) in finished_jobs {
798 let epoch_state = &mut self
799 .command_ctx_queue
800 .get_mut(&epoch)
801 .expect("should exist")
802 .state;
803 assert!(epoch_state.creating_jobs_to_wait.remove(&table_id));
804 debug!(epoch, ?table_id, "finish creating job");
805 let creating_streaming_job = self
808 .creating_streaming_job_controls
809 .remove(&table_id)
810 .expect("should exist");
811 assert!(creating_streaming_job.is_finished());
812 assert!(epoch_state.finished_jobs.insert(table_id, resps).is_none());
813 }
814 }
815 assert!(self.completing_barrier.is_none());
816 while let Some((_, EpochNode { state, .. })) = self.command_ctx_queue.first_key_value()
817 && !state.is_inflight()
818 {
819 {
820 let (_, mut node) = self.command_ctx_queue.pop_first().expect("non-empty");
821 assert!(node.state.creating_jobs_to_wait.is_empty());
822 assert!(node.state.node_to_collect.is_empty());
823
824 let load_finished_source_ids: Vec<_> = node
826 .state
827 .resps
828 .iter()
829 .flat_map(|resp| &resp.load_finished_source_ids)
830 .cloned()
831 .collect();
832 if !load_finished_source_ids.is_empty() {
833 let task = task.get_or_insert_default();
835 task.load_finished_source_ids
836 .extend(load_finished_source_ids);
837 }
838
839 let mut finished_jobs = self.create_mview_tracker.apply_collected_command(
840 node.command_ctx.command.as_ref(),
841 &node.command_ctx.barrier_info,
842 &node.state.resps,
843 hummock_version_stats,
844 );
845 if !node.command_ctx.barrier_info.kind.is_checkpoint() {
846 assert!(finished_jobs.is_empty());
847 node.notifiers.into_iter().for_each(|notifier| {
848 notifier.notify_collected();
849 });
850 if let Some((periodic_barriers, _)) = &mut context
851 && self.create_mview_tracker.has_pending_finished_jobs()
852 && self
853 .command_ctx_queue
854 .values()
855 .all(|node| !node.command_ctx.barrier_info.kind.is_checkpoint())
856 {
857 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
858 }
859 continue;
860 }
861 node.state
862 .finished_jobs
863 .drain()
864 .for_each(|(job_id, resps)| {
865 node.state.resps.extend(resps);
866 finished_jobs.push(TrackingJob::New(TrackingCommand { job_id }));
867 });
868 let task = task.get_or_insert_default();
869 node.command_ctx.collect_commit_epoch_info(
870 &mut task.commit_info,
871 take(&mut node.state.resps),
872 self.collect_backfill_pinned_upstream_log_epoch(),
873 );
874 self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
875 task.finished_jobs.extend(finished_jobs);
876 task.notifiers.extend(node.notifiers);
877 task.epoch_infos
878 .try_insert(
879 self.database_id,
880 (Some((node.command_ctx, node.enqueue_time)), vec![]),
881 )
882 .expect("non duplicate");
883 break;
884 }
885 }
886 if !creating_jobs_task.is_empty() {
887 let task = task.get_or_insert_default();
888 for (table_id, epoch, resps, is_first_time) in creating_jobs_task {
889 collect_creating_job_commit_epoch_info(
890 &mut task.commit_info,
891 epoch,
892 resps,
893 self.creating_streaming_job_controls[&table_id].state_table_ids(),
894 is_first_time,
895 );
896 let (_, creating_job_epochs) =
897 task.epoch_infos.entry(self.database_id).or_default();
898 creating_job_epochs.push((table_id, epoch));
899 }
900 }
901 }
902
903 fn ack_completed(
904 &mut self,
905 command_prev_epoch: Option<u64>,
906 creating_job_epochs: Vec<(TableId, u64)>,
907 ) {
908 {
909 if let Some(prev_epoch) = self.completing_barrier.take() {
910 assert_eq!(command_prev_epoch, Some(prev_epoch));
911 self.committed_epoch = Some(prev_epoch);
912 } else {
913 assert_eq!(command_prev_epoch, None);
914 };
915 for (table_id, epoch) in creating_job_epochs {
916 self.creating_streaming_job_controls
917 .get_mut(&table_id)
918 .expect("should exist")
919 .ack_completed(epoch)
920 }
921 }
922 }
923}
924
925struct EpochNode {
927 enqueue_time: HistogramTimer,
929
930 state: BarrierEpochState,
932
933 command_ctx: CommandContext,
935 notifiers: Vec<Notifier>,
937}
938
939#[derive(Debug)]
940struct BarrierEpochState {
942 node_to_collect: NodeToCollect,
943
944 resps: Vec<BarrierCompleteResponse>,
945
946 creating_jobs_to_wait: HashSet<TableId>,
947
948 finished_jobs: HashMap<TableId, Vec<BarrierCompleteResponse>>,
949}
950
951impl BarrierEpochState {
952 fn is_inflight(&self) -> bool {
953 !self.node_to_collect.is_empty() || !self.creating_jobs_to_wait.is_empty()
954 }
955}
956
957impl DatabaseCheckpointControl {
958 fn handle_new_barrier(
960 &mut self,
961 command: Option<(Command, Vec<Notifier>)>,
962 checkpoint: bool,
963 span: tracing::Span,
964 control_stream_manager: &mut ControlStreamManager,
965 hummock_version_stats: &HummockVersionStats,
966 ) -> MetaResult<()> {
967 let curr_epoch = self.state.in_flight_prev_epoch().next();
968
969 let (mut command, mut notifiers) = if let Some((command, notifiers)) = command {
970 (Some(command), notifiers)
971 } else {
972 (None, vec![])
973 };
974
975 for table_to_cancel in command
976 .as_ref()
977 .map(Command::tables_to_drop)
978 .into_iter()
979 .flatten()
980 {
981 if self
982 .creating_streaming_job_controls
983 .contains_key(&table_to_cancel)
984 {
985 warn!(
986 table_id = table_to_cancel.table_id,
987 "ignore cancel command on creating streaming job"
988 );
989 for notifier in notifiers {
990 notifier
991 .notify_start_failed(anyhow!("cannot cancel creating streaming job, the job will continue creating until created or recovery").into());
992 }
993 return Ok(());
994 }
995 }
996
997 if let Some(Command::RescheduleFragment { .. }) = &command
998 && !self.creating_streaming_job_controls.is_empty()
999 {
1000 warn!("ignore reschedule when creating streaming job with snapshot backfill");
1001 for notifier in notifiers {
1002 notifier.notify_start_failed(
1003 anyhow!(
1004 "cannot reschedule when creating streaming job with snapshot backfill",
1005 )
1006 .into(),
1007 );
1008 }
1009 return Ok(());
1010 }
1011
1012 let Some(barrier_info) =
1013 self.state
1014 .next_barrier_info(command.as_ref(), checkpoint, curr_epoch)
1015 else {
1016 for mut notifier in notifiers {
1018 notifier.notify_started();
1019 notifier.notify_collected();
1020 }
1021 return Ok(());
1022 };
1023
1024 let mut edges = self
1025 .state
1026 .inflight_graph_info
1027 .build_edge(command.as_ref(), &*control_stream_manager);
1028
1029 if let Some(Command::CreateStreamingJob {
1031 job_type,
1032 info,
1033 cross_db_snapshot_backfill_info,
1034 }) = &mut command
1035 {
1036 match job_type {
1037 CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
1038 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1039 fill_snapshot_backfill_epoch(
1040 &mut fragment.nodes,
1041 None,
1042 cross_db_snapshot_backfill_info,
1043 )?;
1044 }
1045 }
1046 CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
1047 if self.state.is_paused() {
1048 warn!("cannot create streaming job with snapshot backfill when paused");
1049 for notifier in notifiers {
1050 notifier.notify_start_failed(
1051 anyhow!("cannot create streaming job with snapshot backfill when paused",)
1052 .into(),
1053 );
1054 }
1055 return Ok(());
1056 }
1057 for snapshot_backfill_epoch in snapshot_backfill_info
1059 .upstream_mv_table_id_to_backfill_epoch
1060 .values_mut()
1061 {
1062 assert_eq!(
1063 snapshot_backfill_epoch.replace(barrier_info.prev_epoch()),
1064 None,
1065 "must not set previously"
1066 );
1067 }
1068 for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
1069 if let Err(e) = fill_snapshot_backfill_epoch(
1070 &mut fragment.nodes,
1071 Some(snapshot_backfill_info),
1072 cross_db_snapshot_backfill_info,
1073 ) {
1074 warn!(e = %e.as_report(), "failed to fill snapshot backfill epoch");
1075 for notifier in notifiers {
1076 notifier.notify_start_failed(e.clone());
1077 }
1078 return Ok(());
1079 };
1080 }
1081 let job_id = info.stream_job_fragments.stream_job_id();
1082 let snapshot_backfill_upstream_tables = snapshot_backfill_info
1083 .upstream_mv_table_id_to_backfill_epoch
1084 .keys()
1085 .cloned()
1086 .collect();
1087
1088 let job = CreatingStreamingJobControl::new(
1089 info,
1090 snapshot_backfill_upstream_tables,
1091 barrier_info.prev_epoch(),
1092 hummock_version_stats,
1093 control_stream_manager,
1094 edges.as_mut().expect("should exist"),
1095 )?;
1096
1097 self.state
1098 .inflight_graph_info
1099 .shared_actor_infos
1100 .upsert(self.database_id, job.graph_info().fragment_infos.values());
1101
1102 self.creating_streaming_job_controls.insert(job_id, job);
1103 }
1104 }
1105 }
1106
1107 if let (BarrierKind::Checkpoint(_), None) = (&barrier_info.kind, &command) {
1109 if let Some(jobs_to_merge) = self.jobs_to_merge() {
1110 command = Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge));
1111 } else {
1112 let pending_backfill_nodes =
1113 self.create_mview_tracker.take_pending_backfill_nodes();
1114 if !pending_backfill_nodes.is_empty() {
1115 command = Some(Command::StartFragmentBackfill {
1116 fragment_ids: pending_backfill_nodes,
1117 });
1118 }
1119 }
1120 }
1121
1122 let command = command;
1123
1124 let (
1125 pre_applied_graph_info,
1126 pre_applied_subscription_info,
1127 table_ids_to_commit,
1128 jobs_to_wait,
1129 prev_paused_reason,
1130 ) = self.state.apply_command(command.as_ref());
1131
1132 barrier_info.prev_epoch.span().in_scope(|| {
1134 tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch.value().0, "new barrier enqueued");
1135 });
1136 span.record("epoch", barrier_info.curr_epoch.value().0);
1137
1138 for creating_job in &mut self.creating_streaming_job_controls.values_mut() {
1139 creating_job.on_new_command(control_stream_manager, command.as_ref(), &barrier_info)?;
1140 }
1141
1142 let node_to_collect = match control_stream_manager.inject_command_ctx_barrier(
1143 self.database_id,
1144 command.as_ref(),
1145 &barrier_info,
1146 prev_paused_reason,
1147 &pre_applied_graph_info,
1148 &self.state.inflight_graph_info,
1149 &mut edges,
1150 ) {
1151 Ok(node_to_collect) => node_to_collect,
1152 Err(err) => {
1153 for notifier in notifiers {
1154 notifier.notify_start_failed(err.clone());
1155 }
1156 fail_point!("inject_barrier_err_success");
1157 return Err(err);
1158 }
1159 };
1160
1161 notifiers.iter_mut().for_each(|n| n.notify_started());
1163
1164 let command_ctx = CommandContext::new(
1165 barrier_info,
1166 pre_applied_subscription_info,
1167 table_ids_to_commit.clone(),
1168 command,
1169 span,
1170 );
1171
1172 self.enqueue_command(command_ctx, notifiers, node_to_collect, jobs_to_wait);
1174
1175 Ok(())
1176 }
1177}