1use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::ops::Bound::{Excluded, Unbounded};
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use itertools::Itertools;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_common::system_param::AdaptiveParallelismStrategy;
28use risingwave_common::util::epoch::EpochPair;
29use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
30use risingwave_meta_model::WorkerId;
31use risingwave_pb::common::WorkerNode;
32use risingwave_pb::hummock::HummockVersionStats;
33use risingwave_pb::id::{FragmentId, PartialGraphId};
34use risingwave_pb::stream_plan::DispatcherType as PbDispatcherType;
35use risingwave_pb::stream_plan::stream_node::NodeBody;
36use risingwave_pb::stream_service::BarrierCompleteResponse;
37use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
38use tracing::{debug, warn};
39
40use crate::barrier::cdc_progress::CdcProgress;
41use crate::barrier::checkpoint::creating_job::CreatingStreamingJobControl;
42use crate::barrier::checkpoint::recovery::{
43 DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
44 RecoveringStateAction,
45};
46use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
47use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
48use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
49use crate::barrier::notifier::Notifier;
50use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager, PartialGraphStat};
51use crate::barrier::progress::TrackingJob;
52use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
53use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
54use crate::barrier::utils::{BarrierItemCollector, collect_creating_job_commit_epoch_info};
55use crate::barrier::{
56 BackfillProgress, Command, CreateStreamingJobType, FragmentBackfillProgress, Reschedule,
57};
58use crate::controller::scale::{build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs};
59use crate::manager::MetaSrvEnv;
60use crate::model::ActorId;
61use crate::rpc::metrics::GLOBAL_META_METRICS;
62use crate::{MetaError, MetaResult};
63
64pub(crate) struct CheckpointControl {
65 pub(crate) env: MetaSrvEnv,
66 pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
67 pub(super) hummock_version_stats: HummockVersionStats,
68 pub(crate) in_flight_barrier_nums: usize,
70}
71
72impl CheckpointControl {
73 pub fn new(env: MetaSrvEnv) -> Self {
74 Self {
75 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
76 env,
77 databases: Default::default(),
78 hummock_version_stats: Default::default(),
79 }
80 }
81
82 pub(crate) fn recover(
83 databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
84 failed_databases: HashMap<DatabaseId, HashSet<PartialGraphId>>, hummock_version_stats: HummockVersionStats,
86 env: MetaSrvEnv,
87 ) -> Self {
88 env.shared_actor_infos()
89 .retain_databases(databases.keys().chain(failed_databases.keys()).cloned());
90 Self {
91 in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
92 env,
93 databases: databases
94 .into_iter()
95 .map(|(database_id, control)| {
96 (
97 database_id,
98 DatabaseCheckpointControlStatus::Running(control),
99 )
100 })
101 .chain(failed_databases.into_iter().map(
102 |(database_id, resetting_partial_graphs)| {
103 (
104 database_id,
105 DatabaseCheckpointControlStatus::Recovering(
106 DatabaseRecoveringState::new_resetting(
107 database_id,
108 resetting_partial_graphs,
109 ),
110 ),
111 )
112 },
113 ))
114 .collect(),
115 hummock_version_stats,
116 }
117 }
118
119 pub(crate) fn ack_completed(
120 &mut self,
121 partial_graph_manager: &mut PartialGraphManager,
122 output: BarrierCompleteOutput,
123 ) {
124 self.hummock_version_stats = output.hummock_version_stats;
125 for (database_id, (command_prev_epoch, creating_job_epochs)) in output.epochs_to_ack {
126 self.databases
127 .get_mut(&database_id)
128 .expect("should exist")
129 .expect_running("should have wait for completing command before enter recovery")
130 .ack_completed(
131 partial_graph_manager,
132 command_prev_epoch,
133 creating_job_epochs,
134 );
135 }
136 }
137
138 pub(crate) fn next_complete_barrier_task(
139 &mut self,
140 periodic_barriers: &mut PeriodicBarriers,
141 partial_graph_manager: &mut PartialGraphManager,
142 ) -> Option<CompleteBarrierTask> {
143 let mut task = None;
144 for database in self.databases.values_mut() {
145 let Some(database) = database.running_state_mut() else {
146 continue;
147 };
148 database.next_complete_barrier_task(
149 periodic_barriers,
150 partial_graph_manager,
151 &mut task,
152 &self.hummock_version_stats,
153 );
154 }
155 task
156 }
157
158 pub(crate) fn barrier_collected(
159 &mut self,
160 partial_graph_id: PartialGraphId,
161 collected_barrier: CollectedBarrier<'_>,
162 periodic_barriers: &mut PeriodicBarriers,
163 ) -> MetaResult<()> {
164 let (database_id, _) = from_partial_graph_id(partial_graph_id);
165 let database_status = self.databases.get_mut(&database_id).expect("should exist");
166 match database_status {
167 DatabaseCheckpointControlStatus::Running(database) => {
168 database.barrier_collected(partial_graph_id, collected_barrier, periodic_barriers)
169 }
170 DatabaseCheckpointControlStatus::Recovering(_) => {
171 if cfg!(debug_assertions) {
172 panic!(
173 "receive collected barrier {:?} on recovering database {} from partial graph {}",
174 collected_barrier, database_id, partial_graph_id
175 );
176 } else {
177 warn!(?collected_barrier, %partial_graph_id, "ignore collected barrier on recovering database");
178 }
179 Ok(())
180 }
181 }
182 }
183
184 pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
185 self.databases.iter().filter_map(|(database_id, database)| {
186 database.running_state().is_none().then_some(*database_id)
187 })
188 }
189
190 pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
191 self.databases.iter().filter_map(|(database_id, database)| {
192 database.running_state().is_some().then_some(*database_id)
193 })
194 }
195
196 pub(crate) fn database_info(&self, database_id: DatabaseId) -> Option<&InflightDatabaseInfo> {
197 self.databases
198 .get(&database_id)
199 .and_then(|database| database.running_state())
200 .map(|database| &database.database_info)
201 }
202
203 pub(crate) fn may_have_snapshot_backfilling_jobs(&self) -> bool {
204 self.databases
205 .values()
206 .any(|database| database.may_have_snapshot_backfilling_jobs())
207 }
208
209 pub(crate) fn handle_new_barrier(
211 &mut self,
212 new_barrier: NewBarrier,
213 partial_graph_manager: &mut PartialGraphManager,
214 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
215 worker_nodes: &HashMap<WorkerId, WorkerNode>,
216 ) -> MetaResult<()> {
217 let NewBarrier {
218 database_id,
219 command,
220 span,
221 checkpoint,
222 } = new_barrier;
223
224 if let Some((mut command, notifiers)) = command {
225 if let &mut Command::CreateStreamingJob {
226 ref mut cross_db_snapshot_backfill_info,
227 ref info,
228 ..
229 } = &mut command
230 {
231 for (table_id, snapshot_epoch) in
232 &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
233 {
234 for database in self.databases.values() {
235 if let Some(database) = database.running_state()
236 && database.database_info.contains_job(table_id.as_job_id())
237 {
238 if let Some(committed_epoch) = database.committed_epoch {
239 *snapshot_epoch = Some(committed_epoch);
240 }
241 break;
242 }
243 }
244 if snapshot_epoch.is_none() {
245 let table_id = *table_id;
246 warn!(
247 ?cross_db_snapshot_backfill_info,
248 ?table_id,
249 ?info,
250 "database of cross db upstream table not found"
251 );
252 let err: MetaError =
253 anyhow!("database of cross db upstream table {} not found", table_id)
254 .into();
255 for notifier in notifiers {
256 notifier.notify_start_failed(err.clone());
257 }
258
259 return Ok(());
260 }
261 }
262 }
263
264 let database = match self.databases.entry(database_id) {
265 Entry::Occupied(entry) => entry
266 .into_mut()
267 .expect_running("should not have command when not running"),
268 Entry::Vacant(entry) => match &command {
269 Command::CreateStreamingJob { info, job_type, .. } => {
270 let CreateStreamingJobType::Normal = job_type else {
271 if cfg!(debug_assertions) {
272 panic!(
273 "unexpected first job of type {job_type:?} with info {info:?}"
274 );
275 } else {
276 for notifier in notifiers {
277 notifier.notify_start_failed(anyhow!("unexpected job_type {job_type:?} for first job {} in database {database_id}", info.streaming_job.id()).into());
278 }
279 return Ok(());
280 }
281 };
282 let new_database = DatabaseCheckpointControl::new(
283 database_id,
284 self.env.shared_actor_infos().clone(),
285 );
286 let adder = partial_graph_manager.add_partial_graph(
287 to_partial_graph_id(database_id, None),
288 DatabaseCheckpointControlMetrics::new(database_id),
289 );
290 adder.added();
291 entry
292 .insert(DatabaseCheckpointControlStatus::Running(new_database))
293 .expect_running("just initialized as running")
294 }
295 Command::Flush
296 | Command::Pause
297 | Command::Resume
298 | Command::DropStreamingJobs { .. }
299 | Command::DropSubscription { .. } => {
300 for mut notifier in notifiers {
301 notifier.notify_started();
302 notifier.notify_collected();
303 }
304 warn!(?command, "skip command for empty database");
305 return Ok(());
306 }
307 Command::RescheduleIntent { .. }
308 | Command::ReplaceStreamJob(_)
309 | Command::SourceChangeSplit(_)
310 | Command::Throttle { .. }
311 | Command::CreateSubscription { .. }
312 | Command::AlterSubscriptionRetention { .. }
313 | Command::ConnectorPropsChange(_)
314 | Command::Refresh { .. }
315 | Command::ListFinish { .. }
316 | Command::LoadFinish { .. }
317 | Command::ResetSource { .. }
318 | Command::ResumeBackfill { .. }
319 | Command::InjectSourceOffsets { .. } => {
320 if cfg!(debug_assertions) {
321 panic!(
322 "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
323 database_id, command
324 )
325 } else {
326 warn!(%database_id, ?command, "database not exist when handling command");
327 for notifier in notifiers {
328 notifier.notify_start_failed(anyhow!("database {database_id} not exist when handling command {command:?}").into());
329 }
330 return Ok(());
331 }
332 }
333 },
334 };
335
336 database.handle_new_barrier(
337 Some((command, notifiers)),
338 checkpoint,
339 span,
340 partial_graph_manager,
341 &self.hummock_version_stats,
342 adaptive_parallelism_strategy,
343 worker_nodes,
344 )
345 } else {
346 let database = match self.databases.entry(database_id) {
347 Entry::Occupied(entry) => entry.into_mut(),
348 Entry::Vacant(_) => {
349 return Ok(());
352 }
353 };
354 let Some(database) = database.running_state_mut() else {
355 return Ok(());
357 };
358 if partial_graph_manager.inflight_barrier_num(database.partial_graph_id)
359 >= self.in_flight_barrier_nums
360 {
361 return Ok(());
363 }
364 database.handle_new_barrier(
365 None,
366 checkpoint,
367 span,
368 partial_graph_manager,
369 &self.hummock_version_stats,
370 adaptive_parallelism_strategy,
371 worker_nodes,
372 )
373 }
374 }
375
376 pub(crate) fn gen_backfill_progress(&self) -> HashMap<JobId, BackfillProgress> {
377 let mut progress = HashMap::new();
378 for status in self.databases.values() {
379 let Some(database_checkpoint_control) = status.running_state() else {
380 continue;
381 };
382 progress.extend(
384 database_checkpoint_control
385 .database_info
386 .gen_backfill_progress(),
387 );
388 for (job_id, creating_job) in
390 &database_checkpoint_control.creating_streaming_job_controls
391 {
392 progress.extend([(*job_id, creating_job.gen_backfill_progress())]);
393 }
394 }
395 progress
396 }
397
398 pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
399 let mut progress = Vec::new();
400 for status in self.databases.values() {
401 let Some(database_checkpoint_control) = status.running_state() else {
402 continue;
403 };
404 progress.extend(
405 database_checkpoint_control
406 .database_info
407 .gen_fragment_backfill_progress(),
408 );
409 for creating_job in database_checkpoint_control
410 .creating_streaming_job_controls
411 .values()
412 {
413 progress.extend(creating_job.gen_fragment_backfill_progress());
414 }
415 }
416 progress
417 }
418
419 pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
420 let mut progress = HashMap::new();
421 for status in self.databases.values() {
422 let Some(database_checkpoint_control) = status.running_state() else {
423 continue;
424 };
425 progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
427 }
428 progress
429 }
430
431 pub(crate) fn databases_failed_at_worker_err(
432 &mut self,
433 worker_id: WorkerId,
434 ) -> impl Iterator<Item = DatabaseId> + '_ {
435 self.databases
436 .iter_mut()
437 .filter_map(
438 move |(database_id, database_status)| match database_status {
439 DatabaseCheckpointControlStatus::Running(control) => {
440 if !control.is_valid_after_worker_err(worker_id) {
441 Some(*database_id)
442 } else {
443 None
444 }
445 }
446 DatabaseCheckpointControlStatus::Recovering(state) => {
447 if !state.is_valid_after_worker_err(worker_id) {
448 Some(*database_id)
449 } else {
450 None
451 }
452 }
453 },
454 )
455 }
456}
457
458pub(crate) enum CheckpointControlEvent<'a> {
459 EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
460 EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
461}
462
463impl CheckpointControl {
464 pub(crate) fn on_partial_graph_reset(
465 &mut self,
466 partial_graph_id: PartialGraphId,
467 reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
468 ) {
469 let (database_id, creating_job) = from_partial_graph_id(partial_graph_id);
470 match self.databases.get_mut(&database_id).expect("should exist") {
471 DatabaseCheckpointControlStatus::Running(database) => {
472 if let Some(creating_job_id) = creating_job {
473 let Some(job) = database
474 .creating_streaming_job_controls
475 .remove(&creating_job_id)
476 else {
477 if cfg!(debug_assertions) {
478 panic!(
479 "receive reset partial graph resp on non-existing creating job {creating_job_id} in database {database_id}"
480 )
481 }
482 warn!(
483 %database_id,
484 %creating_job_id,
485 "ignore reset partial graph resp on non-existing creating job on running database"
486 );
487 return;
488 };
489 job.on_partial_graph_reset();
490 } else {
491 unreachable!("should not receive reset database resp when database running")
492 }
493 }
494 DatabaseCheckpointControlStatus::Recovering(state) => {
495 state.on_partial_graph_reset(partial_graph_id, reset_resps);
496 }
497 }
498 }
499
500 pub(crate) fn on_partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
501 let (database_id, _) = from_partial_graph_id(partial_graph_id);
502 match self.databases.get_mut(&database_id).expect("should exist") {
503 DatabaseCheckpointControlStatus::Running(_) => {
504 unreachable!("should not have partial graph initialized when running")
505 }
506 DatabaseCheckpointControlStatus::Recovering(state) => {
507 state.partial_graph_initialized(partial_graph_id);
508 }
509 }
510 }
511
512 pub(crate) fn next_event(
513 &mut self,
514 ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
515 let mut this = Some(self);
516 poll_fn(move |cx| {
517 let Some(this_mut) = this.as_mut() else {
518 unreachable!("should not be polled after poll ready")
519 };
520 for (&database_id, database_status) in &mut this_mut.databases {
521 match database_status {
522 DatabaseCheckpointControlStatus::Running(_) => {}
523 DatabaseCheckpointControlStatus::Recovering(state) => {
524 let poll_result = state.poll_next_event(cx);
525 if let Poll::Ready(action) = poll_result {
526 let this = this.take().expect("checked Some");
527 return Poll::Ready(match action {
528 RecoveringStateAction::EnterInitializing(reset_workers) => {
529 CheckpointControlEvent::EnteringInitializing(
530 this.new_database_status_action(
531 database_id,
532 EnterInitializing(reset_workers),
533 ),
534 )
535 }
536 RecoveringStateAction::EnterRunning => {
537 CheckpointControlEvent::EnteringRunning(
538 this.new_database_status_action(database_id, EnterRunning),
539 )
540 }
541 });
542 }
543 }
544 }
545 }
546 Poll::Pending
547 })
548 }
549}
550
551pub(crate) enum DatabaseCheckpointControlStatus {
552 Running(DatabaseCheckpointControl),
553 Recovering(DatabaseRecoveringState),
554}
555
556impl DatabaseCheckpointControlStatus {
557 fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
558 match self {
559 DatabaseCheckpointControlStatus::Running(state) => Some(state),
560 DatabaseCheckpointControlStatus::Recovering(_) => None,
561 }
562 }
563
564 fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
565 match self {
566 DatabaseCheckpointControlStatus::Running(state) => Some(state),
567 DatabaseCheckpointControlStatus::Recovering(_) => None,
568 }
569 }
570
571 fn may_have_snapshot_backfilling_jobs(&self) -> bool {
572 self.running_state()
573 .map(|database| !database.creating_streaming_job_controls.is_empty())
574 .unwrap_or(true) }
576
577 fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
578 match self {
579 DatabaseCheckpointControlStatus::Running(state) => state,
580 DatabaseCheckpointControlStatus::Recovering(_) => {
581 panic!("should be at running: {}", reason)
582 }
583 }
584 }
585}
586
587pub(in crate::barrier) struct DatabaseCheckpointControlMetrics {
588 barrier_latency: LabelGuardedHistogram,
589 in_flight_barrier_nums: LabelGuardedIntGauge,
590 all_barrier_nums: LabelGuardedIntGauge,
591}
592
593impl DatabaseCheckpointControlMetrics {
594 pub(in crate::barrier) fn new(database_id: DatabaseId) -> Self {
595 let database_id_str = database_id.to_string();
596 let barrier_latency = GLOBAL_META_METRICS
597 .barrier_latency
598 .with_guarded_label_values(&[&database_id_str]);
599 let in_flight_barrier_nums = GLOBAL_META_METRICS
600 .in_flight_barrier_nums
601 .with_guarded_label_values(&[&database_id_str]);
602 let all_barrier_nums = GLOBAL_META_METRICS
603 .all_barrier_nums
604 .with_guarded_label_values(&[&database_id_str]);
605 Self {
606 barrier_latency,
607 in_flight_barrier_nums,
608 all_barrier_nums,
609 }
610 }
611}
612
613impl PartialGraphStat for DatabaseCheckpointControlMetrics {
614 fn observe_barrier_latency(&self, _epoch: EpochPair, barrier_latency_secs: f64) {
615 self.barrier_latency.observe(barrier_latency_secs);
616 }
617
618 fn observe_barrier_num(&self, inflight_barrier_num: usize, collected_barrier_num: usize) {
619 self.in_flight_barrier_nums.set(inflight_barrier_num as _);
620 self.all_barrier_nums
621 .set((inflight_barrier_num + collected_barrier_num) as _);
622 }
623}
624
625pub(in crate::barrier) struct DatabaseCheckpointControl {
627 pub(super) database_id: DatabaseId,
628 partial_graph_id: PartialGraphId,
629 pub(super) state: BarrierWorkerState,
630
631 finishing_jobs_collector:
632 BarrierItemCollector<JobId, (Vec<BarrierCompleteResponse>, TrackingJob), ()>,
633 completing_barrier: Option<u64>,
636
637 committed_epoch: Option<u64>,
638
639 pub(super) database_info: InflightDatabaseInfo,
640 pub creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
641}
642
643impl DatabaseCheckpointControl {
644 fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
645 Self {
646 database_id,
647 partial_graph_id: to_partial_graph_id(database_id, None),
648 state: BarrierWorkerState::new(),
649 finishing_jobs_collector: BarrierItemCollector::new(),
650 completing_barrier: None,
651 committed_epoch: None,
652 database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
653 creating_streaming_job_controls: Default::default(),
654 }
655 }
656
657 pub(crate) fn recovery(
658 database_id: DatabaseId,
659 state: BarrierWorkerState,
660 committed_epoch: u64,
661 database_info: InflightDatabaseInfo,
662 creating_streaming_job_controls: HashMap<JobId, CreatingStreamingJobControl>,
663 ) -> Self {
664 Self {
665 database_id,
666 partial_graph_id: to_partial_graph_id(database_id, None),
667 state,
668 finishing_jobs_collector: BarrierItemCollector::new(),
669 completing_barrier: None,
670 committed_epoch: Some(committed_epoch),
671 database_info,
672 creating_streaming_job_controls,
673 }
674 }
675
676 pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
677 !self.database_info.contains_worker(worker_id as _)
678 && self
679 .creating_streaming_job_controls
680 .values()
681 .all(|job| job.is_valid_after_worker_err(worker_id))
682 }
683
684 fn enqueue_command(&mut self, epoch: EpochPair, creating_jobs_to_wait: HashSet<JobId>) {
686 let prev_epoch = epoch.prev;
687 tracing::trace!(prev_epoch, ?creating_jobs_to_wait, "enqueue command");
688 if !creating_jobs_to_wait.is_empty() {
689 self.finishing_jobs_collector
690 .enqueue(epoch, creating_jobs_to_wait, ());
691 }
692 }
693
694 fn barrier_collected(
697 &mut self,
698 partial_graph_id: PartialGraphId,
699 collected_barrier: CollectedBarrier<'_>,
700 periodic_barriers: &mut PeriodicBarriers,
701 ) -> MetaResult<()> {
702 let prev_epoch = collected_barrier.epoch.prev;
703 tracing::trace!(
704 prev_epoch,
705 partial_graph_id = %partial_graph_id,
706 "barrier collected"
707 );
708 let (database_id, creating_job_id) = from_partial_graph_id(partial_graph_id);
709 assert_eq!(self.database_id, database_id);
710 if let Some(creating_job_id) = creating_job_id {
711 let should_merge_to_upstream = self
712 .creating_streaming_job_controls
713 .get_mut(&creating_job_id)
714 .expect("should exist")
715 .collect(collected_barrier);
716 if should_merge_to_upstream {
717 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
718 }
719 }
720 Ok(())
721 }
722}
723
724impl DatabaseCheckpointControl {
725 fn collect_backfill_pinned_upstream_log_epoch(
727 &self,
728 ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
729 self.creating_streaming_job_controls
730 .iter()
731 .map(|(job_id, creating_job)| (*job_id, creating_job.pinned_upstream_log_epoch()))
732 .collect()
733 }
734
735 fn collect_no_shuffle_fragment_relations_for_reschedule_check(
736 &self,
737 ) -> Vec<(FragmentId, FragmentId)> {
738 let mut no_shuffle_relations = Vec::new();
739 for fragment in self.database_info.fragment_infos() {
740 let downstream_fragment_id = fragment.fragment_id;
741 visit_stream_node_cont(&fragment.nodes, |node| {
742 if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
743 && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
744 {
745 no_shuffle_relations.push((merge.upstream_fragment_id, downstream_fragment_id));
746 }
747 true
748 });
749 }
750
751 for creating_job in self.creating_streaming_job_controls.values() {
752 creating_job.collect_no_shuffle_fragment_relations(&mut no_shuffle_relations);
753 }
754 no_shuffle_relations
755 }
756
757 fn collect_reschedule_blocked_jobs_for_creating_jobs_inflight(
758 &self,
759 ) -> MetaResult<HashSet<JobId>> {
760 let mut initial_blocked_fragment_ids = HashSet::new();
761 for creating_job in self.creating_streaming_job_controls.values() {
762 creating_job.collect_reschedule_blocked_fragment_ids(&mut initial_blocked_fragment_ids);
763 }
764
765 let mut blocked_fragment_ids = initial_blocked_fragment_ids.clone();
766 if !initial_blocked_fragment_ids.is_empty() {
767 let no_shuffle_relations =
768 self.collect_no_shuffle_fragment_relations_for_reschedule_check();
769 let (forward_edges, backward_edges) =
770 build_no_shuffle_fragment_graph_edges(no_shuffle_relations);
771 let initial_blocked_fragment_ids: Vec<_> =
772 initial_blocked_fragment_ids.iter().copied().collect();
773 for ensemble in find_no_shuffle_graphs(
774 &initial_blocked_fragment_ids,
775 &forward_edges,
776 &backward_edges,
777 )? {
778 blocked_fragment_ids.extend(ensemble.fragments());
779 }
780 }
781
782 let mut blocked_job_ids = HashSet::new();
783 blocked_job_ids.extend(
784 blocked_fragment_ids
785 .into_iter()
786 .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id)),
787 );
788 Ok(blocked_job_ids)
789 }
790
791 fn collect_reschedule_blocked_job_ids(
792 &self,
793 reschedules: &HashMap<FragmentId, Reschedule>,
794 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
795 blocked_job_ids: &HashSet<JobId>,
796 ) -> HashSet<JobId> {
797 let mut affected_fragment_ids: HashSet<FragmentId> = reschedules.keys().copied().collect();
798 affected_fragment_ids.extend(fragment_actors.keys().copied());
799 for reschedule in reschedules.values() {
800 affected_fragment_ids.extend(reschedule.downstream_fragment_ids.iter().copied());
801 affected_fragment_ids.extend(
802 reschedule
803 .upstream_fragment_dispatcher_ids
804 .iter()
805 .map(|(fragment_id, _)| *fragment_id),
806 );
807 }
808
809 affected_fragment_ids
810 .into_iter()
811 .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id))
812 .filter(|job_id| blocked_job_ids.contains(job_id))
813 .collect()
814 }
815
816 fn next_complete_barrier_task(
817 &mut self,
818 periodic_barriers: &mut PeriodicBarriers,
819 partial_graph_manager: &mut PartialGraphManager,
820 task: &mut Option<CompleteBarrierTask>,
821 hummock_version_stats: &HummockVersionStats,
822 ) {
823 let mut creating_jobs_task = vec![];
825 if let Some(committed_epoch) = self.committed_epoch {
826 let mut finished_jobs = Vec::new();
828 let min_upstream_inflight_barrier = partial_graph_manager
829 .first_inflight_barrier(self.partial_graph_id)
830 .map(|epoch| epoch.prev);
831 for (job_id, job) in &mut self.creating_streaming_job_controls {
832 if let Some((epoch, resps, info, is_finish_epoch)) = job.start_completing(
833 partial_graph_manager,
834 min_upstream_inflight_barrier,
835 committed_epoch,
836 ) {
837 let resps = resps.into_values().collect_vec();
838 if is_finish_epoch {
839 assert!(info.notifiers.is_empty());
840 finished_jobs.push((*job_id, epoch, resps));
841 continue;
842 };
843 creating_jobs_task.push((*job_id, epoch, resps, info));
844 }
845 }
846
847 if !finished_jobs.is_empty() {
848 partial_graph_manager.remove_partial_graphs(
849 finished_jobs
850 .iter()
851 .map(|(job_id, ..)| to_partial_graph_id(self.database_id, Some(*job_id)))
852 .collect(),
853 );
854 }
855 for (job_id, epoch, resps) in finished_jobs {
856 debug!(epoch, %job_id, "finish creating job");
857 let creating_streaming_job = self
860 .creating_streaming_job_controls
861 .remove(&job_id)
862 .expect("should exist");
863 let tracking_job = creating_streaming_job.into_tracking_job();
864 self.finishing_jobs_collector
865 .collect(epoch, job_id, (resps, tracking_job));
866 }
867 }
868 let mut observed_non_checkpoint = false;
869 self.finishing_jobs_collector.advance_collected();
870 let epoch_end_bound = self
871 .finishing_jobs_collector
872 .first_inflight_epoch()
873 .map_or(Unbounded, |epoch| Excluded(epoch.prev));
874 if let Some((epoch, resps, info)) = partial_graph_manager.start_completing(
875 self.partial_graph_id,
876 epoch_end_bound,
877 |_, resps, post_collect_command| {
878 observed_non_checkpoint = true;
879 self.handle_refresh_table_info(task, &resps);
880 self.database_info.apply_collected_command(
881 &post_collect_command,
882 &resps,
883 hummock_version_stats,
884 );
885 },
886 ) {
887 self.handle_refresh_table_info(task, &resps);
888 self.database_info.apply_collected_command(
889 &info.post_collect_command,
890 &resps,
891 hummock_version_stats,
892 );
893 let mut resps_to_commit = resps.into_values().collect_vec();
894 let mut staging_commit_info = self.database_info.take_staging_commit_info();
895 if let Some((_, finished_jobs, _)) =
896 self.finishing_jobs_collector
897 .take_collected_if(|collected_epoch| {
898 assert!(epoch <= collected_epoch.prev);
899 epoch == collected_epoch.prev
900 })
901 {
902 finished_jobs
903 .into_iter()
904 .for_each(|(_, (resps, tracking_job))| {
905 resps_to_commit.extend(resps);
906 staging_commit_info.finished_jobs.push(tracking_job);
907 });
908 }
909 {
910 let task = task.get_or_insert_default();
911 Command::collect_commit_epoch_info(
912 &self.database_info,
913 &info,
914 &mut task.commit_info,
915 resps_to_commit,
916 self.collect_backfill_pinned_upstream_log_epoch(),
917 );
918 self.completing_barrier = Some(info.barrier_info.prev_epoch());
919 task.finished_jobs.extend(staging_commit_info.finished_jobs);
920 task.finished_cdc_table_backfill
921 .extend(staging_commit_info.finished_cdc_table_backfill);
922 task.epoch_infos
923 .try_insert(self.partial_graph_id, info)
924 .expect("non duplicate");
925 task.commit_info
926 .truncate_tables
927 .extend(staging_commit_info.table_ids_to_truncate);
928 }
929 } else if observed_non_checkpoint
930 && self.database_info.has_pending_finished_jobs()
931 && !partial_graph_manager.has_pending_checkpoint_barrier(self.partial_graph_id)
932 {
933 periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
934 }
935 if !creating_jobs_task.is_empty() {
936 let task = task.get_or_insert_default();
937 for (job_id, epoch, resps, info) in creating_jobs_task {
938 collect_creating_job_commit_epoch_info(&mut task.commit_info, epoch, resps, &info);
939 task.epoch_infos
940 .try_insert(to_partial_graph_id(self.database_id, Some(job_id)), info)
941 .expect("non duplicate");
942 }
943 }
944 }
945
946 fn ack_completed(
947 &mut self,
948 partial_graph_manager: &mut PartialGraphManager,
949 command_prev_epoch: Option<u64>,
950 creating_job_epochs: Vec<(JobId, u64)>,
951 ) {
952 {
953 if let Some(prev_epoch) = self.completing_barrier.take() {
954 assert_eq!(command_prev_epoch, Some(prev_epoch));
955 self.committed_epoch = Some(prev_epoch);
956 partial_graph_manager.ack_completed(self.partial_graph_id, prev_epoch);
957 } else {
958 assert_eq!(command_prev_epoch, None);
959 };
960 for (job_id, epoch) in creating_job_epochs {
961 if let Some(job) = self.creating_streaming_job_controls.get_mut(&job_id) {
962 job.ack_completed(partial_graph_manager, epoch);
963 }
964 }
967 }
968 }
969
970 fn handle_refresh_table_info(
971 &self,
972 task: &mut Option<CompleteBarrierTask>,
973 resps: &HashMap<WorkerId, BarrierCompleteResponse>,
974 ) {
975 let list_finished_info = resps
976 .values()
977 .flat_map(|resp| resp.list_finished_sources.clone())
978 .collect::<Vec<_>>();
979 if !list_finished_info.is_empty() {
980 let task = task.get_or_insert_default();
981 task.list_finished_source_ids.extend(list_finished_info);
982 }
983
984 let load_finished_info = resps
985 .values()
986 .flat_map(|resp| resp.load_finished_sources.clone())
987 .collect::<Vec<_>>();
988 if !load_finished_info.is_empty() {
989 let task = task.get_or_insert_default();
990 task.load_finished_source_ids.extend(load_finished_info);
991 }
992
993 let refresh_finished_table_ids: Vec<JobId> = resps
994 .values()
995 .flat_map(|resp| {
996 resp.refresh_finished_tables
997 .iter()
998 .map(|table_id| table_id.as_job_id())
999 })
1000 .collect::<Vec<_>>();
1001 if !refresh_finished_table_ids.is_empty() {
1002 let task = task.get_or_insert_default();
1003 task.refresh_finished_table_job_ids
1004 .extend(refresh_finished_table_ids);
1005 }
1006 }
1007}
1008
1009impl DatabaseCheckpointControl {
1010 fn handle_new_barrier(
1012 &mut self,
1013 command: Option<(Command, Vec<Notifier>)>,
1014 checkpoint: bool,
1015 span: tracing::Span,
1016 partial_graph_manager: &mut PartialGraphManager,
1017 hummock_version_stats: &HummockVersionStats,
1018 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
1019 worker_nodes: &HashMap<WorkerId, WorkerNode>,
1020 ) -> MetaResult<()> {
1021 let curr_epoch = self.state.in_flight_prev_epoch().next();
1022
1023 let (command, mut notifiers) = if let Some((command, notifiers)) = command {
1024 (Some(command), notifiers)
1025 } else {
1026 (None, vec![])
1027 };
1028
1029 debug_assert!(
1030 !matches!(
1031 &command,
1032 Some(Command::RescheduleIntent {
1033 reschedule_plan: None,
1034 ..
1035 })
1036 ),
1037 "reschedule intent should be resolved before injection"
1038 );
1039
1040 if let Some(Command::DropStreamingJobs {
1041 streaming_job_ids, ..
1042 }) = &command
1043 {
1044 if streaming_job_ids.len() > 1 {
1045 for job_to_cancel in streaming_job_ids {
1046 if self
1047 .creating_streaming_job_controls
1048 .contains_key(job_to_cancel)
1049 {
1050 warn!(
1051 job_id = %job_to_cancel,
1052 "ignore multi-job cancel command on creating snapshot backfill streaming job"
1053 );
1054 for notifier in notifiers {
1055 notifier
1056 .notify_start_failed(anyhow!("cannot cancel creating snapshot backfill streaming job with other jobs, \
1057 the job will continue creating until created or recovery. Please cancel the snapshot backfilling job in a single DDL ").into());
1058 }
1059 return Ok(());
1060 }
1061 }
1062 } else if let Some(job_to_drop) = streaming_job_ids.iter().next()
1063 && let Some(creating_job) =
1064 self.creating_streaming_job_controls.get_mut(job_to_drop)
1065 && creating_job.drop(&mut notifiers, partial_graph_manager)
1066 {
1067 return Ok(());
1068 }
1069 }
1070
1071 if let Some(Command::Throttle { jobs, .. }) = &command
1072 && jobs.len() > 1
1073 && let Some(creating_job_id) = jobs
1074 .iter()
1075 .find(|job| self.creating_streaming_job_controls.contains_key(*job))
1076 {
1077 warn!(
1078 job_id = %creating_job_id,
1079 "ignore multi-job throttle command on creating snapshot backfill streaming job"
1080 );
1081 for notifier in notifiers {
1082 notifier
1083 .notify_start_failed(anyhow!("cannot alter rate limit for snapshot backfill streaming job with other jobs, \
1084 the original rate limit will be kept recovery.").into());
1085 }
1086 return Ok(());
1087 };
1088
1089 if let Some(Command::RescheduleIntent {
1090 reschedule_plan: Some(reschedule_plan),
1091 ..
1092 }) = &command
1093 && !self.creating_streaming_job_controls.is_empty()
1094 {
1095 let blocked_job_ids =
1096 self.collect_reschedule_blocked_jobs_for_creating_jobs_inflight()?;
1097 let blocked_reschedule_job_ids = self.collect_reschedule_blocked_job_ids(
1098 &reschedule_plan.reschedules,
1099 &reschedule_plan.fragment_actors,
1100 &blocked_job_ids,
1101 );
1102 if !blocked_reschedule_job_ids.is_empty() {
1103 warn!(
1104 blocked_reschedule_job_ids = ?blocked_reschedule_job_ids,
1105 "reject reschedule fragments related to creating unreschedulable backfill jobs"
1106 );
1107 for notifier in notifiers {
1108 notifier.notify_start_failed(
1109 anyhow!(
1110 "cannot reschedule jobs {:?} when creating jobs with unreschedulable backfill fragments",
1111 blocked_reschedule_job_ids
1112 )
1113 .into(),
1114 );
1115 }
1116 return Ok(());
1117 }
1118 }
1119
1120 if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1121 && self.database_info.is_empty()
1122 {
1123 assert!(
1124 self.creating_streaming_job_controls.is_empty(),
1125 "should not have snapshot backfill job when there is no normal job in database"
1126 );
1127 for mut notifier in notifiers {
1129 notifier.notify_started();
1130 notifier.notify_collected();
1131 }
1132 return Ok(());
1133 };
1134
1135 if let Some(Command::CreateStreamingJob {
1136 job_type: CreateStreamingJobType::SnapshotBackfill(_),
1137 ..
1138 }) = &command
1139 && self.state.is_paused()
1140 {
1141 warn!("cannot create streaming job with snapshot backfill when paused");
1142 for notifier in notifiers {
1143 notifier.notify_start_failed(
1144 anyhow!("cannot create streaming job with snapshot backfill when paused",)
1145 .into(),
1146 );
1147 }
1148 return Ok(());
1149 }
1150
1151 let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1152 barrier_info.prev_epoch.span().in_scope(|| {
1154 tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch(), "new barrier enqueued");
1155 });
1156 span.record("epoch", barrier_info.curr_epoch());
1157
1158 let epoch = barrier_info.epoch();
1159 let ApplyCommandInfo { jobs_to_wait } = match self.apply_command(
1160 command,
1161 &mut notifiers,
1162 barrier_info,
1163 partial_graph_manager,
1164 hummock_version_stats,
1165 adaptive_parallelism_strategy,
1166 worker_nodes,
1167 ) {
1168 Ok(info) => {
1169 assert!(notifiers.is_empty());
1170 info
1171 }
1172 Err(err) => {
1173 for notifier in notifiers {
1174 notifier.notify_start_failed(err.clone());
1175 }
1176 fail_point!("inject_barrier_err_success");
1177 return Err(err);
1178 }
1179 };
1180
1181 self.enqueue_command(epoch, jobs_to_wait);
1183
1184 Ok(())
1185 }
1186}