risingwave_meta/barrier/checkpoint/
control.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::future::{Future, poll_fn};
18use std::ops::Bound::{Excluded, Unbounded};
19use std::task::Poll;
20
21use anyhow::anyhow;
22use fail::fail_point;
23use itertools::Itertools;
24use risingwave_common::catalog::{DatabaseId, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
27use risingwave_common::util::epoch::EpochPair;
28use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
29use risingwave_meta_model::WorkerId;
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::id::{FragmentId, PartialGraphId};
33use risingwave_pb::stream_plan::DispatcherType as PbDispatcherType;
34use risingwave_pb::stream_plan::stream_node::NodeBody;
35use risingwave_pb::stream_service::BarrierCompleteResponse;
36use risingwave_pb::stream_service::streaming_control_stream_response::ResetPartialGraphResponse;
37use tracing::{debug, warn};
38
39use crate::barrier::cdc_progress::CdcProgress;
40use crate::barrier::checkpoint::independent_job::IndependentCheckpointJobControl;
41use crate::barrier::checkpoint::recovery::{
42    DatabaseRecoveringState, DatabaseStatusAction, EnterInitializing, EnterRunning,
43    RecoveringStateAction,
44};
45use crate::barrier::checkpoint::state::{ApplyCommandInfo, BarrierWorkerState};
46use crate::barrier::complete_task::{BarrierCompleteOutput, CompleteBarrierTask};
47use crate::barrier::info::{InflightDatabaseInfo, SharedActorInfos};
48use crate::barrier::notifier::Notifier;
49use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphManager, PartialGraphStat};
50use crate::barrier::progress::TrackingJob;
51use crate::barrier::rpc::{from_partial_graph_id, to_partial_graph_id};
52use crate::barrier::schedule::{NewBarrier, PeriodicBarriers};
53use crate::barrier::utils::{BarrierItemCollector, collect_independent_job_commit_epoch_info};
54use crate::barrier::{
55    BackfillProgress, Command, CreateStreamingJobType, FragmentBackfillProgress, Reschedule,
56};
57use crate::controller::fragment::InflightFragmentInfo;
58use crate::controller::scale::{build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs};
59use crate::manager::MetaSrvEnv;
60
61fn fragment_has_online_unreschedulable_scan(fragment: &InflightFragmentInfo) -> bool {
62    let mut has_unreschedulable_scan = false;
63    visit_stream_node_cont(&fragment.nodes, |node| {
64        if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
65            let scan_type = stream_scan.stream_scan_type();
66            if !scan_type.is_reschedulable(true) {
67                has_unreschedulable_scan = true;
68                return false;
69            }
70        }
71        true
72    });
73    has_unreschedulable_scan
74}
75
76fn collect_fragment_upstream_fragment_ids(
77    fragment: &InflightFragmentInfo,
78    upstream_fragment_ids: &mut HashSet<FragmentId>,
79) {
80    visit_stream_node_cont(&fragment.nodes, |node| {
81        if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref() {
82            upstream_fragment_ids.insert(merge.upstream_fragment_id);
83        }
84        true
85    });
86}
87
88use crate::model::ActorId;
89use crate::rpc::metrics::GLOBAL_META_METRICS;
90use crate::{MetaError, MetaResult};
91
92pub(crate) struct CheckpointControl {
93    pub(crate) env: MetaSrvEnv,
94    pub(super) databases: HashMap<DatabaseId, DatabaseCheckpointControlStatus>,
95    pub(super) hummock_version_stats: HummockVersionStats,
96    /// The max barrier nums in flight
97    pub(crate) in_flight_barrier_nums: usize,
98}
99
100impl CheckpointControl {
101    pub fn new(env: MetaSrvEnv) -> Self {
102        Self {
103            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
104            env,
105            databases: Default::default(),
106            hummock_version_stats: Default::default(),
107        }
108    }
109
110    pub(crate) fn recover(
111        databases: HashMap<DatabaseId, DatabaseCheckpointControl>,
112        failed_databases: HashMap<DatabaseId, HashSet<PartialGraphId>>, /* `database_id` -> set of resetting partial graph ids */
113        hummock_version_stats: HummockVersionStats,
114        env: MetaSrvEnv,
115    ) -> Self {
116        env.shared_actor_infos()
117            .retain_databases(databases.keys().chain(failed_databases.keys()).cloned());
118        Self {
119            in_flight_barrier_nums: env.opts.in_flight_barrier_nums,
120            env,
121            databases: databases
122                .into_iter()
123                .map(|(database_id, control)| {
124                    (
125                        database_id,
126                        DatabaseCheckpointControlStatus::Running(control),
127                    )
128                })
129                .chain(failed_databases.into_iter().map(
130                    |(database_id, resetting_partial_graphs)| {
131                        (
132                            database_id,
133                            DatabaseCheckpointControlStatus::Recovering(
134                                DatabaseRecoveringState::new_resetting(
135                                    database_id,
136                                    resetting_partial_graphs,
137                                ),
138                            ),
139                        )
140                    },
141                ))
142                .collect(),
143            hummock_version_stats,
144        }
145    }
146
147    pub(crate) fn ack_completed(
148        &mut self,
149        partial_graph_manager: &mut PartialGraphManager,
150        output: BarrierCompleteOutput,
151    ) {
152        self.hummock_version_stats = output.hummock_version_stats;
153        for (database_id, (command_prev_epoch, independent_job_epochs)) in output.epochs_to_ack {
154            self.databases
155                .get_mut(&database_id)
156                .expect("should exist")
157                .expect_running("should have wait for completing command before enter recovery")
158                .ack_completed(
159                    partial_graph_manager,
160                    command_prev_epoch,
161                    independent_job_epochs,
162                );
163        }
164    }
165
166    pub(crate) fn next_complete_barrier_task(
167        &mut self,
168        periodic_barriers: &mut PeriodicBarriers,
169        partial_graph_manager: &mut PartialGraphManager,
170    ) -> Option<CompleteBarrierTask> {
171        let mut task = None;
172        for database in self.databases.values_mut() {
173            let Some(database) = database.running_state_mut() else {
174                continue;
175            };
176            database.next_complete_barrier_task(
177                periodic_barriers,
178                partial_graph_manager,
179                &mut task,
180                &self.hummock_version_stats,
181            );
182        }
183        task
184    }
185
186    pub(crate) fn barrier_collected(
187        &mut self,
188        partial_graph_id: PartialGraphId,
189        collected_barrier: CollectedBarrier<'_>,
190        periodic_barriers: &mut PeriodicBarriers,
191    ) -> MetaResult<()> {
192        let (database_id, _) = from_partial_graph_id(partial_graph_id);
193        let database_status = self.databases.get_mut(&database_id).expect("should exist");
194        match database_status {
195            DatabaseCheckpointControlStatus::Running(database) => {
196                database.barrier_collected(partial_graph_id, collected_barrier, periodic_barriers)
197            }
198            DatabaseCheckpointControlStatus::Recovering(_) => {
199                if cfg!(debug_assertions) {
200                    panic!(
201                        "receive collected barrier {:?} on recovering database {} from partial graph {}",
202                        collected_barrier, database_id, partial_graph_id
203                    );
204                } else {
205                    warn!(?collected_barrier, %partial_graph_id, "ignore collected barrier on recovering database");
206                }
207                Ok(())
208            }
209        }
210    }
211
212    pub(crate) fn recovering_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
213        self.databases.iter().filter_map(|(database_id, database)| {
214            database.running_state().is_none().then_some(*database_id)
215        })
216    }
217
218    pub(crate) fn running_databases(&self) -> impl Iterator<Item = DatabaseId> + '_ {
219        self.databases.iter().filter_map(|(database_id, database)| {
220            database.running_state().is_some().then_some(*database_id)
221        })
222    }
223
224    pub(crate) fn database_info(&self, database_id: DatabaseId) -> Option<&InflightDatabaseInfo> {
225        self.databases
226            .get(&database_id)
227            .and_then(|database| database.running_state())
228            .map(|database| &database.database_info)
229    }
230
231    pub(crate) fn may_have_snapshot_backfilling_jobs(&self) -> bool {
232        self.databases
233            .values()
234            .any(|database| database.may_have_snapshot_backfilling_jobs())
235    }
236
237    /// return Some(failed `database_id` -> `err`)
238    pub(crate) fn handle_new_barrier(
239        &mut self,
240        new_barrier: NewBarrier,
241        partial_graph_manager: &mut PartialGraphManager,
242        worker_nodes: &HashMap<WorkerId, WorkerNode>,
243    ) -> MetaResult<()> {
244        let NewBarrier {
245            database_id,
246            command,
247            span,
248            checkpoint,
249        } = new_barrier;
250
251        if let Some((mut command, notifiers)) = command {
252            if let &mut Command::CreateStreamingJob {
253                ref mut cross_db_snapshot_backfill_info,
254                ref info,
255                ..
256            } = &mut command
257            {
258                for (table_id, snapshot_epoch) in
259                    &mut cross_db_snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
260                {
261                    for database in self.databases.values() {
262                        if let Some(database) = database.running_state()
263                            && database.database_info.contains_job(table_id.as_job_id())
264                        {
265                            if let Some(committed_epoch) = database.committed_epoch {
266                                *snapshot_epoch = Some(committed_epoch);
267                            }
268                            break;
269                        }
270                    }
271                    if snapshot_epoch.is_none() {
272                        let table_id = *table_id;
273                        warn!(
274                            ?cross_db_snapshot_backfill_info,
275                            ?table_id,
276                            ?info,
277                            "database of cross db upstream table not found"
278                        );
279                        let err: MetaError =
280                            anyhow!("database of cross db upstream table {} not found", table_id)
281                                .into();
282                        for notifier in notifiers {
283                            notifier.notify_start_failed(err.clone());
284                        }
285
286                        return Ok(());
287                    }
288                }
289            }
290
291            let database = match self.databases.entry(database_id) {
292                Entry::Occupied(entry) => entry
293                    .into_mut()
294                    .expect_running("should not have command when not running"),
295                Entry::Vacant(entry) => match &command {
296                    Command::CreateStreamingJob { info, job_type, .. } => {
297                        let CreateStreamingJobType::Normal = job_type else {
298                            if cfg!(debug_assertions) {
299                                panic!(
300                                    "unexpected first job of type {job_type:?} with info {info:?}"
301                                );
302                            } else {
303                                for notifier in notifiers {
304                                    notifier.notify_start_failed(anyhow!("unexpected job_type {job_type:?} for first job {} in database {database_id}", info.streaming_job.id()).into());
305                                }
306                                return Ok(());
307                            }
308                        };
309                        let new_database = DatabaseCheckpointControl::new(
310                            database_id,
311                            self.env.shared_actor_infos().clone(),
312                        );
313                        let adder = partial_graph_manager.add_partial_graph(
314                            to_partial_graph_id(database_id, None),
315                            DatabaseCheckpointControlMetrics::new(database_id),
316                        );
317                        adder.added();
318                        entry
319                            .insert(DatabaseCheckpointControlStatus::Running(new_database))
320                            .expect_running("just initialized as running")
321                    }
322                    Command::Flush
323                    | Command::Pause
324                    | Command::Resume
325                    | Command::DropStreamingJobs { .. }
326                    | Command::DropSubscription { .. } => {
327                        for mut notifier in notifiers {
328                            notifier.notify_started();
329                            notifier.notify_collected();
330                        }
331                        warn!(?command, "skip command for empty database");
332                        return Ok(());
333                    }
334                    Command::RescheduleIntent { .. }
335                    | Command::ReplaceStreamJob(_)
336                    | Command::SourceChangeSplit(_)
337                    | Command::Throttle { .. }
338                    | Command::CreateSubscription { .. }
339                    | Command::AlterSubscriptionRetention { .. }
340                    | Command::ConnectorPropsChange(_)
341                    | Command::Refresh { .. }
342                    | Command::ListFinish { .. }
343                    | Command::LoadFinish { .. }
344                    | Command::ResetSource { .. }
345                    | Command::ResumeBackfill { .. }
346                    | Command::InjectSourceOffsets { .. } => {
347                        if cfg!(debug_assertions) {
348                            panic!(
349                                "new database graph info can only be created for normal creating streaming job, but get command: {} {:?}",
350                                database_id, command
351                            )
352                        } else {
353                            warn!(%database_id, ?command, "database not exist when handling command");
354                            for notifier in notifiers {
355                                notifier.notify_start_failed(anyhow!("database {database_id} not exist when handling command {command:?}").into());
356                            }
357                            return Ok(());
358                        }
359                    }
360                },
361            };
362
363            database.handle_new_barrier(
364                Some((command, notifiers)),
365                checkpoint,
366                span,
367                partial_graph_manager,
368                &self.hummock_version_stats,
369                worker_nodes,
370            )
371        } else {
372            let database = match self.databases.entry(database_id) {
373                Entry::Occupied(entry) => entry.into_mut(),
374                Entry::Vacant(_) => {
375                    // If it does not exist in the HashMap yet, it means that the first streaming
376                    // job has not been created, and we do not need to send a barrier.
377                    return Ok(());
378                }
379            };
380            let Some(database) = database.running_state_mut() else {
381                // Skip new barrier for database which is not running.
382                return Ok(());
383            };
384            if partial_graph_manager.inflight_barrier_num(database.partial_graph_id)
385                >= self.in_flight_barrier_nums
386            {
387                // Skip new barrier with no explicit command when the database should pause inject additional barrier
388                return Ok(());
389            }
390            database.handle_new_barrier(
391                None,
392                checkpoint,
393                span,
394                partial_graph_manager,
395                &self.hummock_version_stats,
396                worker_nodes,
397            )
398        }
399    }
400
401    pub(crate) fn gen_backfill_progress(&self) -> HashMap<JobId, BackfillProgress> {
402        let mut progress = HashMap::new();
403        for status in self.databases.values() {
404            let Some(database_checkpoint_control) = status.running_state() else {
405                continue;
406            };
407            // Progress of normal backfill
408            progress.extend(
409                database_checkpoint_control
410                    .database_info
411                    .gen_backfill_progress(),
412            );
413            // Progress of independent checkpoint jobs
414            for (job_id, job) in &database_checkpoint_control.independent_checkpoint_job_controls {
415                if let Some(p) = job.gen_backfill_progress() {
416                    progress.insert(*job_id, p);
417                }
418            }
419        }
420        progress
421    }
422
423    pub(crate) fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
424        let mut progress = Vec::new();
425        for status in self.databases.values() {
426            let Some(database_checkpoint_control) = status.running_state() else {
427                continue;
428            };
429            progress.extend(
430                database_checkpoint_control
431                    .database_info
432                    .gen_fragment_backfill_progress(),
433            );
434            for job in database_checkpoint_control
435                .independent_checkpoint_job_controls
436                .values()
437            {
438                progress.extend(job.gen_fragment_backfill_progress());
439            }
440        }
441        progress
442    }
443
444    pub(crate) fn gen_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
445        let mut progress = HashMap::new();
446        for status in self.databases.values() {
447            let Some(database_checkpoint_control) = status.running_state() else {
448                continue;
449            };
450            // Progress of normal backfill
451            progress.extend(database_checkpoint_control.database_info.gen_cdc_progress());
452        }
453        progress
454    }
455
456    pub(crate) fn databases_failed_at_worker_err(
457        &mut self,
458        worker_id: WorkerId,
459    ) -> impl Iterator<Item = DatabaseId> + '_ {
460        self.databases
461            .iter_mut()
462            .filter_map(
463                move |(database_id, database_status)| match database_status {
464                    DatabaseCheckpointControlStatus::Running(control) => {
465                        if !control.is_valid_after_worker_err(worker_id) {
466                            Some(*database_id)
467                        } else {
468                            None
469                        }
470                    }
471                    DatabaseCheckpointControlStatus::Recovering(state) => {
472                        if !state.is_valid_after_worker_err(worker_id) {
473                            Some(*database_id)
474                        } else {
475                            None
476                        }
477                    }
478                },
479            )
480    }
481}
482
483pub(crate) enum CheckpointControlEvent<'a> {
484    EnteringInitializing(DatabaseStatusAction<'a, EnterInitializing>),
485    EnteringRunning(DatabaseStatusAction<'a, EnterRunning>),
486}
487
488impl CheckpointControl {
489    pub(crate) fn on_partial_graph_reset(
490        &mut self,
491        partial_graph_id: PartialGraphId,
492        reset_resps: HashMap<WorkerId, ResetPartialGraphResponse>,
493    ) {
494        let (database_id, independent_job_id) = from_partial_graph_id(partial_graph_id);
495        match self.databases.get_mut(&database_id).expect("should exist") {
496            DatabaseCheckpointControlStatus::Running(database) => {
497                if let Some(independent_job_id) = independent_job_id {
498                    match database
499                        .independent_checkpoint_job_controls
500                        .remove(&independent_job_id)
501                    {
502                        Some(independent_job) => {
503                            independent_job.on_partial_graph_reset();
504                        }
505                        None => {
506                            if cfg!(debug_assertions) {
507                                panic!(
508                                    "receive reset partial graph resp on non-existing independent job {independent_job_id} in database {database_id}"
509                                )
510                            }
511                            warn!(
512                                %database_id,
513                                %independent_job_id,
514                                "ignore reset partial graph resp on non-existing independent job on running database"
515                            );
516                        }
517                    }
518                } else {
519                    unreachable!("should not receive reset database resp when database running")
520                }
521            }
522            DatabaseCheckpointControlStatus::Recovering(state) => {
523                state.on_partial_graph_reset(partial_graph_id, reset_resps);
524            }
525        }
526    }
527
528    pub(crate) fn on_partial_graph_initialized(&mut self, partial_graph_id: PartialGraphId) {
529        let (database_id, _) = from_partial_graph_id(partial_graph_id);
530        match self.databases.get_mut(&database_id).expect("should exist") {
531            DatabaseCheckpointControlStatus::Running(_) => {
532                unreachable!("should not have partial graph initialized when running")
533            }
534            DatabaseCheckpointControlStatus::Recovering(state) => {
535                state.partial_graph_initialized(partial_graph_id);
536            }
537        }
538    }
539
540    pub(crate) fn next_event(
541        &mut self,
542    ) -> impl Future<Output = CheckpointControlEvent<'_>> + Send + '_ {
543        let mut this = Some(self);
544        poll_fn(move |cx| {
545            let Some(this_mut) = this.as_mut() else {
546                unreachable!("should not be polled after poll ready")
547            };
548            for (&database_id, database_status) in &mut this_mut.databases {
549                match database_status {
550                    DatabaseCheckpointControlStatus::Running(_) => {}
551                    DatabaseCheckpointControlStatus::Recovering(state) => {
552                        let poll_result = state.poll_next_event(cx);
553                        if let Poll::Ready(action) = poll_result {
554                            let this = this.take().expect("checked Some");
555                            return Poll::Ready(match action {
556                                RecoveringStateAction::EnterInitializing(reset_workers) => {
557                                    CheckpointControlEvent::EnteringInitializing(
558                                        this.new_database_status_action(
559                                            database_id,
560                                            EnterInitializing(reset_workers),
561                                        ),
562                                    )
563                                }
564                                RecoveringStateAction::EnterRunning => {
565                                    CheckpointControlEvent::EnteringRunning(
566                                        this.new_database_status_action(database_id, EnterRunning),
567                                    )
568                                }
569                            });
570                        }
571                    }
572                }
573            }
574            Poll::Pending
575        })
576    }
577}
578
579pub(crate) enum DatabaseCheckpointControlStatus {
580    Running(DatabaseCheckpointControl),
581    Recovering(DatabaseRecoveringState),
582}
583
584impl DatabaseCheckpointControlStatus {
585    fn running_state(&self) -> Option<&DatabaseCheckpointControl> {
586        match self {
587            DatabaseCheckpointControlStatus::Running(state) => Some(state),
588            DatabaseCheckpointControlStatus::Recovering(_) => None,
589        }
590    }
591
592    fn running_state_mut(&mut self) -> Option<&mut DatabaseCheckpointControl> {
593        match self {
594            DatabaseCheckpointControlStatus::Running(state) => Some(state),
595            DatabaseCheckpointControlStatus::Recovering(_) => None,
596        }
597    }
598
599    fn may_have_snapshot_backfilling_jobs(&self) -> bool {
600        self.running_state()
601            .map(|database| {
602                database
603                    .independent_checkpoint_job_controls
604                    .values()
605                    .any(|job| job.is_snapshot_backfilling())
606            })
607            .unwrap_or(true) // there can be snapshot backfilling jobs when the database is recovering.
608    }
609
610    fn expect_running(&mut self, reason: &'static str) -> &mut DatabaseCheckpointControl {
611        match self {
612            DatabaseCheckpointControlStatus::Running(state) => state,
613            DatabaseCheckpointControlStatus::Recovering(_) => {
614                panic!("should be at running: {}", reason)
615            }
616        }
617    }
618}
619
620pub(in crate::barrier) struct DatabaseCheckpointControlMetrics {
621    barrier_latency: LabelGuardedHistogram,
622    in_flight_barrier_nums: LabelGuardedIntGauge,
623    all_barrier_nums: LabelGuardedIntGauge,
624}
625
626impl DatabaseCheckpointControlMetrics {
627    pub(in crate::barrier) fn new(database_id: DatabaseId) -> Self {
628        let database_id_str = database_id.to_string();
629        let barrier_latency = GLOBAL_META_METRICS
630            .barrier_latency
631            .with_guarded_label_values(&[&database_id_str]);
632        let in_flight_barrier_nums = GLOBAL_META_METRICS
633            .in_flight_barrier_nums
634            .with_guarded_label_values(&[&database_id_str]);
635        let all_barrier_nums = GLOBAL_META_METRICS
636            .all_barrier_nums
637            .with_guarded_label_values(&[&database_id_str]);
638        Self {
639            barrier_latency,
640            in_flight_barrier_nums,
641            all_barrier_nums,
642        }
643    }
644}
645
646impl PartialGraphStat for DatabaseCheckpointControlMetrics {
647    fn observe_barrier_latency(&self, _epoch: EpochPair, barrier_latency_secs: f64) {
648        self.barrier_latency.observe(barrier_latency_secs);
649    }
650
651    fn observe_barrier_num(&self, inflight_barrier_num: usize, collected_barrier_num: usize) {
652        self.in_flight_barrier_nums.set(inflight_barrier_num as _);
653        self.all_barrier_nums
654            .set((inflight_barrier_num + collected_barrier_num) as _);
655    }
656}
657
658/// Controls the concurrent execution of commands.
659pub(in crate::barrier) struct DatabaseCheckpointControl {
660    pub(super) database_id: DatabaseId,
661    partial_graph_id: PartialGraphId,
662    pub(super) state: BarrierWorkerState,
663
664    finishing_jobs_collector:
665        BarrierItemCollector<JobId, (Vec<BarrierCompleteResponse>, TrackingJob), ()>,
666    /// The barrier that are completing.
667    /// Some(`prev_epoch`)
668    completing_barrier: Option<u64>,
669
670    committed_epoch: Option<u64>,
671
672    pub(super) database_info: InflightDatabaseInfo,
673    pub independent_checkpoint_job_controls: HashMap<JobId, IndependentCheckpointJobControl>,
674}
675
676impl DatabaseCheckpointControl {
677    fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
678        Self {
679            database_id,
680            partial_graph_id: to_partial_graph_id(database_id, None),
681            state: BarrierWorkerState::new(),
682            finishing_jobs_collector: BarrierItemCollector::new(),
683            completing_barrier: None,
684            committed_epoch: None,
685            database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
686            independent_checkpoint_job_controls: Default::default(),
687        }
688    }
689
690    pub(crate) fn recovery(
691        database_id: DatabaseId,
692        state: BarrierWorkerState,
693        committed_epoch: u64,
694        database_info: InflightDatabaseInfo,
695        independent_checkpoint_job_controls: HashMap<JobId, IndependentCheckpointJobControl>,
696    ) -> Self {
697        Self {
698            database_id,
699            partial_graph_id: to_partial_graph_id(database_id, None),
700            state,
701            finishing_jobs_collector: BarrierItemCollector::new(),
702            completing_barrier: None,
703            committed_epoch: Some(committed_epoch),
704            database_info,
705            independent_checkpoint_job_controls,
706        }
707    }
708
709    pub(crate) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
710        !self.database_info.contains_worker(worker_id as _)
711            && self
712                .independent_checkpoint_job_controls
713                .values()
714                .all(|job| {
715                    job.fragment_infos()
716                        .map(|fragment_infos| {
717                            !InflightFragmentInfo::contains_worker(
718                                fragment_infos.values(),
719                                worker_id,
720                            )
721                        })
722                        .unwrap_or(true)
723                })
724    }
725
726    /// Enqueue a barrier command
727    fn enqueue_command(&mut self, epoch: EpochPair, independent_jobs_to_wait: HashSet<JobId>) {
728        let prev_epoch = epoch.prev;
729        tracing::trace!(prev_epoch, ?independent_jobs_to_wait, "enqueue command");
730        if !independent_jobs_to_wait.is_empty() {
731            self.finishing_jobs_collector
732                .enqueue(epoch, independent_jobs_to_wait, ());
733        }
734    }
735
736    /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes
737    /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them.
738    fn barrier_collected(
739        &mut self,
740        partial_graph_id: PartialGraphId,
741        collected_barrier: CollectedBarrier<'_>,
742        periodic_barriers: &mut PeriodicBarriers,
743    ) -> MetaResult<()> {
744        let prev_epoch = collected_barrier.epoch.prev;
745        tracing::trace!(
746            prev_epoch,
747            partial_graph_id = %partial_graph_id,
748            "barrier collected"
749        );
750        let (database_id, independent_job_id) = from_partial_graph_id(partial_graph_id);
751        assert_eq!(self.database_id, database_id);
752        if let Some(independent_job_id) = independent_job_id {
753            let job = self
754                .independent_checkpoint_job_controls
755                .get_mut(&independent_job_id)
756                .expect("should exist");
757            let should_force_checkpoint = job.collect(collected_barrier);
758            if should_force_checkpoint {
759                periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
760            }
761        }
762        Ok(())
763    }
764}
765
766impl DatabaseCheckpointControl {
767    /// return creating job table fragment id -> (backfill progress epoch , {`upstream_mv_table_id`})
768    fn collect_backfill_pinned_upstream_log_epoch(
769        &self,
770    ) -> HashMap<JobId, (u64, HashSet<TableId>)> {
771        self.independent_checkpoint_job_controls
772            .iter()
773            .map(|(job_id, job)| (*job_id, job.pinned_upstream_log_epoch()))
774            .collect()
775    }
776
777    fn collect_no_shuffle_fragment_relations_for_reschedule_check(
778        &self,
779    ) -> Vec<(FragmentId, FragmentId)> {
780        let mut no_shuffle_relations = Vec::new();
781        for fragment in self.database_info.fragment_infos() {
782            let downstream_fragment_id = fragment.fragment_id;
783            visit_stream_node_cont(&fragment.nodes, |node| {
784                if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
785                    && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
786                {
787                    no_shuffle_relations.push((merge.upstream_fragment_id, downstream_fragment_id));
788                }
789                true
790            });
791        }
792
793        for job in self.independent_checkpoint_job_controls.values() {
794            if let Some(fragment_infos) = job.fragment_infos() {
795                for fragment_info in fragment_infos.values() {
796                    let downstream_fragment_id = fragment_info.fragment_id;
797                    visit_stream_node_cont(&fragment_info.nodes, |node| {
798                        if let Some(NodeBody::Merge(merge)) = node.node_body.as_ref()
799                            && merge.upstream_dispatcher_type == PbDispatcherType::NoShuffle as i32
800                        {
801                            no_shuffle_relations
802                                .push((merge.upstream_fragment_id, downstream_fragment_id));
803                        }
804                        true
805                    });
806                }
807            }
808        }
809        no_shuffle_relations
810    }
811
812    fn collect_reschedule_blocked_jobs_for_independent_jobs_inflight(
813        &self,
814    ) -> MetaResult<HashSet<JobId>> {
815        let mut initial_blocked_fragment_ids = HashSet::new();
816        for job in self.independent_checkpoint_job_controls.values() {
817            if let Some(fragment_infos) = job.fragment_infos() {
818                for fragment_info in fragment_infos.values() {
819                    if fragment_has_online_unreschedulable_scan(fragment_info) {
820                        initial_blocked_fragment_ids.insert(fragment_info.fragment_id);
821                        collect_fragment_upstream_fragment_ids(
822                            fragment_info,
823                            &mut initial_blocked_fragment_ids,
824                        );
825                    }
826                }
827            }
828        }
829
830        let mut blocked_fragment_ids = initial_blocked_fragment_ids.clone();
831        if !initial_blocked_fragment_ids.is_empty() {
832            let no_shuffle_relations =
833                self.collect_no_shuffle_fragment_relations_for_reschedule_check();
834            let (forward_edges, backward_edges) =
835                build_no_shuffle_fragment_graph_edges(no_shuffle_relations);
836            let initial_blocked_fragment_ids: Vec<_> =
837                initial_blocked_fragment_ids.iter().copied().collect();
838            for ensemble in find_no_shuffle_graphs(
839                &initial_blocked_fragment_ids,
840                &forward_edges,
841                &backward_edges,
842            )? {
843                blocked_fragment_ids.extend(ensemble.fragments());
844            }
845        }
846
847        let mut blocked_job_ids = HashSet::new();
848        blocked_job_ids.extend(
849            blocked_fragment_ids
850                .into_iter()
851                .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id)),
852        );
853        Ok(blocked_job_ids)
854    }
855
856    fn collect_reschedule_blocked_job_ids(
857        &self,
858        reschedules: &HashMap<FragmentId, Reschedule>,
859        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
860        blocked_job_ids: &HashSet<JobId>,
861    ) -> HashSet<JobId> {
862        let mut affected_fragment_ids: HashSet<FragmentId> = reschedules.keys().copied().collect();
863        affected_fragment_ids.extend(fragment_actors.keys().copied());
864        for reschedule in reschedules.values() {
865            affected_fragment_ids.extend(reschedule.downstream_fragment_ids.iter().copied());
866            affected_fragment_ids.extend(
867                reschedule
868                    .upstream_fragment_dispatcher_ids
869                    .iter()
870                    .map(|(fragment_id, _)| *fragment_id),
871            );
872        }
873
874        affected_fragment_ids
875            .into_iter()
876            .filter_map(|fragment_id| self.database_info.job_id_by_fragment(fragment_id))
877            .filter(|job_id| blocked_job_ids.contains(job_id))
878            .collect()
879    }
880
881    fn next_complete_barrier_task(
882        &mut self,
883        periodic_barriers: &mut PeriodicBarriers,
884        partial_graph_manager: &mut PartialGraphManager,
885        task: &mut Option<CompleteBarrierTask>,
886        hummock_version_stats: &HummockVersionStats,
887    ) {
888        // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
889        let mut independent_jobs_task = vec![];
890        if let Some(committed_epoch) = self.committed_epoch {
891            // `Vec::new` is a const fn, and do not have memory allocation, and therefore is lightweight enough
892            let mut finished_jobs = Vec::new();
893            let min_upstream_inflight_barrier = partial_graph_manager
894                .first_inflight_barrier(self.partial_graph_id)
895                .map(|epoch| epoch.prev);
896            for (job_id, job) in &mut self.independent_checkpoint_job_controls {
897                match job {
898                    IndependentCheckpointJobControl::CreatingStreamingJob(creating_job) => {
899                        if let Some((epoch, resps, info, is_finish_epoch)) = creating_job
900                            .start_completing(
901                                partial_graph_manager,
902                                min_upstream_inflight_barrier,
903                                committed_epoch,
904                            )
905                        {
906                            let resps = resps.into_values().collect_vec();
907                            if is_finish_epoch {
908                                assert!(info.notifiers.is_empty());
909                                finished_jobs.push((*job_id, epoch, resps));
910                                continue;
911                            };
912                            independent_jobs_task.push((*job_id, epoch, resps, info));
913                        }
914                    }
915                    IndependentCheckpointJobControl::BatchRefresh(batch_refresh_job) => {
916                        if let Some((epoch, resps, info, tracking_job)) =
917                            batch_refresh_job.start_completing(partial_graph_manager)
918                        {
919                            let resps = resps.into_values().collect_vec();
920                            if let Some(tracking_job) = tracking_job {
921                                let task = task.get_or_insert_default();
922                                task.finished_jobs.push(tracking_job);
923                            }
924                            independent_jobs_task.push((*job_id, epoch, resps, info));
925                        }
926                    }
927                }
928            }
929            if !finished_jobs.is_empty() {
930                partial_graph_manager.remove_partial_graphs(
931                    finished_jobs
932                        .iter()
933                        .map(|(job_id, ..)| to_partial_graph_id(self.database_id, Some(*job_id)))
934                        .collect(),
935                );
936            }
937            for (job_id, epoch, resps) in finished_jobs {
938                debug!(epoch, %job_id, "finish creating job");
939                // It's safe to remove the creating job, because on CompleteJobType::Finished,
940                // all previous barriers have been collected and completed.
941                let Some(IndependentCheckpointJobControl::CreatingStreamingJob(
942                    creating_streaming_job,
943                )) = self.independent_checkpoint_job_controls.remove(&job_id)
944                else {
945                    panic!("finished job {job_id} should be a creating streaming job");
946                };
947                let tracking_job = creating_streaming_job.into_tracking_job();
948                self.finishing_jobs_collector
949                    .collect(epoch, job_id, (resps, tracking_job));
950            }
951        }
952        let mut observed_non_checkpoint = false;
953        self.finishing_jobs_collector.advance_collected();
954        let epoch_end_bound = self
955            .finishing_jobs_collector
956            .first_inflight_epoch()
957            .map_or(Unbounded, |epoch| Excluded(epoch.prev));
958        if let Some((epoch, resps, info)) = partial_graph_manager.start_completing(
959            self.partial_graph_id,
960            epoch_end_bound,
961            |_, resps, post_collect_command| {
962                observed_non_checkpoint = true;
963                self.handle_refresh_table_info(task, &resps);
964                self.database_info.apply_collected_command(
965                    &post_collect_command,
966                    &resps,
967                    hummock_version_stats,
968                );
969            },
970        ) {
971            self.handle_refresh_table_info(task, &resps);
972            self.database_info.apply_collected_command(
973                &info.post_collect_command,
974                &resps,
975                hummock_version_stats,
976            );
977            let mut resps_to_commit = resps.into_values().collect_vec();
978            let mut staging_commit_info = self.database_info.take_staging_commit_info();
979            if let Some((_, finished_jobs, _)) =
980                self.finishing_jobs_collector
981                    .take_collected_if(|collected_epoch| {
982                        assert!(epoch <= collected_epoch.prev);
983                        epoch == collected_epoch.prev
984                    })
985            {
986                finished_jobs
987                    .into_iter()
988                    .for_each(|(_, (resps, tracking_job))| {
989                        resps_to_commit.extend(resps);
990                        staging_commit_info.finished_jobs.push(tracking_job);
991                    });
992            }
993            {
994                let task = task.get_or_insert_default();
995                Command::collect_commit_epoch_info(
996                    &self.database_info,
997                    &info,
998                    &mut task.commit_info,
999                    resps_to_commit,
1000                    self.collect_backfill_pinned_upstream_log_epoch(),
1001                );
1002                self.completing_barrier = Some(info.barrier_info.prev_epoch());
1003                task.finished_jobs.extend(staging_commit_info.finished_jobs);
1004                task.finished_cdc_table_backfill
1005                    .extend(staging_commit_info.finished_cdc_table_backfill);
1006                task.epoch_infos
1007                    .try_insert(self.partial_graph_id, info)
1008                    .expect("non duplicate");
1009                task.commit_info
1010                    .truncate_tables
1011                    .extend(staging_commit_info.table_ids_to_truncate);
1012            }
1013        } else if observed_non_checkpoint
1014            && self.database_info.has_pending_finished_jobs()
1015            && !partial_graph_manager.has_pending_checkpoint_barrier(self.partial_graph_id)
1016        {
1017            periodic_barriers.force_checkpoint_in_next_barrier(self.database_id);
1018        }
1019        if !independent_jobs_task.is_empty() {
1020            let task = task.get_or_insert_default();
1021            for (job_id, epoch, resps, info) in independent_jobs_task {
1022                collect_independent_job_commit_epoch_info(
1023                    &mut task.commit_info,
1024                    epoch,
1025                    resps,
1026                    &info,
1027                );
1028                task.epoch_infos
1029                    .try_insert(to_partial_graph_id(self.database_id, Some(job_id)), info)
1030                    .expect("non duplicate");
1031            }
1032        }
1033    }
1034
1035    fn ack_completed(
1036        &mut self,
1037        partial_graph_manager: &mut PartialGraphManager,
1038        command_prev_epoch: Option<u64>,
1039        independent_job_epochs: Vec<(JobId, u64)>,
1040    ) {
1041        {
1042            if let Some(prev_epoch) = self.completing_barrier.take() {
1043                assert_eq!(command_prev_epoch, Some(prev_epoch));
1044                self.committed_epoch = Some(prev_epoch);
1045                partial_graph_manager.ack_completed(self.partial_graph_id, prev_epoch);
1046            } else {
1047                assert_eq!(command_prev_epoch, None);
1048            };
1049            for (job_id, epoch) in independent_job_epochs {
1050                if let Some(job) = self.independent_checkpoint_job_controls.get_mut(&job_id) {
1051                    job.ack_completed(partial_graph_manager, epoch);
1052                }
1053                // If the job is not found, it was dropped and already removed
1054                // by `on_partial_graph_reset` while the completing task was running.
1055            }
1056        }
1057    }
1058
1059    fn handle_refresh_table_info(
1060        &self,
1061        task: &mut Option<CompleteBarrierTask>,
1062        resps: &HashMap<WorkerId, BarrierCompleteResponse>,
1063    ) {
1064        let list_finished_info = resps
1065            .values()
1066            .flat_map(|resp| resp.list_finished_sources.clone())
1067            .collect::<Vec<_>>();
1068        if !list_finished_info.is_empty() {
1069            let task = task.get_or_insert_default();
1070            task.list_finished_source_ids.extend(list_finished_info);
1071        }
1072
1073        let load_finished_info = resps
1074            .values()
1075            .flat_map(|resp| resp.load_finished_sources.clone())
1076            .collect::<Vec<_>>();
1077        if !load_finished_info.is_empty() {
1078            let task = task.get_or_insert_default();
1079            task.load_finished_source_ids.extend(load_finished_info);
1080        }
1081
1082        let refresh_finished_table_ids: Vec<JobId> = resps
1083            .values()
1084            .flat_map(|resp| {
1085                resp.refresh_finished_tables
1086                    .iter()
1087                    .map(|table_id| table_id.as_job_id())
1088            })
1089            .collect::<Vec<_>>();
1090        if !refresh_finished_table_ids.is_empty() {
1091            let task = task.get_or_insert_default();
1092            task.refresh_finished_table_job_ids
1093                .extend(refresh_finished_table_ids);
1094        }
1095    }
1096}
1097
1098impl DatabaseCheckpointControl {
1099    /// Handle the new barrier from the scheduled queue and inject it.
1100    fn handle_new_barrier(
1101        &mut self,
1102        command: Option<(Command, Vec<Notifier>)>,
1103        checkpoint: bool,
1104        span: tracing::Span,
1105        partial_graph_manager: &mut PartialGraphManager,
1106        hummock_version_stats: &HummockVersionStats,
1107        worker_nodes: &HashMap<WorkerId, WorkerNode>,
1108    ) -> MetaResult<()> {
1109        let curr_epoch = self.state.in_flight_prev_epoch().next();
1110
1111        let (command, mut notifiers) = if let Some((command, notifiers)) = command {
1112            (Some(command), notifiers)
1113        } else {
1114            (None, vec![])
1115        };
1116
1117        debug_assert!(
1118            !matches!(
1119                &command,
1120                Some(Command::RescheduleIntent {
1121                    reschedule_plan: None,
1122                    ..
1123                })
1124            ),
1125            "reschedule intent should be resolved before injection"
1126        );
1127
1128        if let Some(Command::DropStreamingJobs {
1129            streaming_job_ids, ..
1130        }) = &command
1131        {
1132            if streaming_job_ids.len() > 1 {
1133                for job_to_cancel in streaming_job_ids {
1134                    if self
1135                        .independent_checkpoint_job_controls
1136                        .contains_key(job_to_cancel)
1137                    {
1138                        warn!(
1139                            job_id = %job_to_cancel,
1140                            "ignore multi-job cancel command on creating snapshot backfill streaming job"
1141                        );
1142                        for notifier in notifiers {
1143                            notifier
1144                                .notify_start_failed(anyhow!("cannot cancel creating snapshot backfill streaming job with other jobs, \
1145                                the job will continue creating until created or recovery. Please cancel the snapshot backfilling job in a single DDL ").into());
1146                        }
1147                        return Ok(());
1148                    }
1149                }
1150            } else if let Some(job_to_drop) = streaming_job_ids.iter().next()
1151                && let Some(job) = self
1152                    .independent_checkpoint_job_controls
1153                    .get_mut(job_to_drop)
1154            {
1155                let dropped = job.drop(&mut notifiers, partial_graph_manager);
1156                if dropped {
1157                    return Ok(());
1158                }
1159            }
1160        }
1161
1162        if let Some(Command::Throttle { jobs, .. }) = &command
1163            && jobs.len() > 1
1164            && let Some(independent_job_id) = jobs
1165                .iter()
1166                .find(|job| self.independent_checkpoint_job_controls.contains_key(*job))
1167        {
1168            warn!(
1169                job_id = %independent_job_id,
1170                "ignore multi-job throttle command on independent checkpoint job"
1171            );
1172            for notifier in notifiers {
1173                notifier.notify_start_failed(
1174                    anyhow!(
1175                        "cannot alter rate limit for independent checkpoint job with other jobs, \
1176                                the original rate limit will be kept during recovery."
1177                    )
1178                    .into(),
1179                );
1180            }
1181            return Ok(());
1182        };
1183
1184        if let Some(Command::RescheduleIntent {
1185            reschedule_plan: Some(reschedule_plan),
1186            ..
1187        }) = &command
1188            && !self.independent_checkpoint_job_controls.is_empty()
1189        {
1190            let blocked_job_ids =
1191                self.collect_reschedule_blocked_jobs_for_independent_jobs_inflight()?;
1192            let blocked_reschedule_job_ids = self.collect_reschedule_blocked_job_ids(
1193                &reschedule_plan.reschedules,
1194                &reschedule_plan.fragment_actors,
1195                &blocked_job_ids,
1196            );
1197            if !blocked_reschedule_job_ids.is_empty() {
1198                warn!(
1199                    blocked_reschedule_job_ids = ?blocked_reschedule_job_ids,
1200                    "reject reschedule fragments related to creating unreschedulable backfill jobs"
1201                );
1202                for notifier in notifiers {
1203                    notifier.notify_start_failed(
1204                        anyhow!(
1205                            "cannot reschedule jobs {:?} when creating jobs with unreschedulable backfill fragments",
1206                            blocked_reschedule_job_ids
1207                        )
1208                            .into(),
1209                    );
1210                }
1211                return Ok(());
1212            }
1213        }
1214
1215        if !matches!(&command, Some(Command::CreateStreamingJob { .. }))
1216            && self.database_info.is_empty()
1217        {
1218            assert!(
1219                self.independent_checkpoint_job_controls.is_empty(),
1220                "should not have snapshot backfill job when there is no normal job in database"
1221            );
1222            // skip the command when there is nothing to do with the barrier
1223            for mut notifier in notifiers {
1224                notifier.notify_started();
1225                notifier.notify_collected();
1226            }
1227            return Ok(());
1228        };
1229
1230        if let Some(Command::CreateStreamingJob {
1231            job_type:
1232                CreateStreamingJobType::SnapshotBackfill(_) | CreateStreamingJobType::BatchRefresh(_),
1233            ..
1234        }) = &command
1235            && self.state.is_paused()
1236        {
1237            warn!("cannot create streaming job with snapshot backfill when paused");
1238            for notifier in notifiers {
1239                notifier.notify_start_failed(
1240                    anyhow!("cannot create streaming job with snapshot backfill when paused",)
1241                        .into(),
1242                );
1243            }
1244            return Ok(());
1245        }
1246
1247        let barrier_info = self.state.next_barrier_info(checkpoint, curr_epoch);
1248        // Tracing related stuff
1249        barrier_info.prev_epoch.span().in_scope(|| {
1250            tracing::info!(target: "rw_tracing", epoch = barrier_info.curr_epoch(), "new barrier enqueued");
1251        });
1252        span.record("epoch", barrier_info.curr_epoch());
1253
1254        let epoch = barrier_info.epoch();
1255        let ApplyCommandInfo { jobs_to_wait } = match self.apply_command(
1256            command,
1257            &mut notifiers,
1258            barrier_info,
1259            partial_graph_manager,
1260            hummock_version_stats,
1261            worker_nodes,
1262        ) {
1263            Ok(info) => {
1264                assert!(notifiers.is_empty());
1265                info
1266            }
1267            Err(err) => {
1268                for notifier in notifiers {
1269                    notifier.notify_start_failed(err.clone());
1270                }
1271                fail_point!("inject_barrier_err_success");
1272                return Err(err);
1273            }
1274        };
1275
1276        // Record the in-flight barrier.
1277        self.enqueue_command(epoch, jobs_to_wait);
1278
1279        Ok(())
1280    }
1281}